Using dask.distributed
as Parallel Pool Example¶
This example shows how you can use any parallel pool-executor like object to run combos - specifally the dask.distributed.Client.
%config InlineBackend.figure_formats = ['svg']
import xyzpy as xyz
import numpy as np
from dask.distributed import Client
First we instantiate the client:
address = None # put in an actual address like '127.0.0.1:8786' if you have a scheduler running
c = Client(address=address)
c
Client
Client-9c29bac9-b1b9-11ee-a4c1-a4bb6d3d80bc
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
0d6b6cf5
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 8 | Total memory: 15.33 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-91f9d6ef-a967-4d75-984f-7c88acbda5c5
Comm: tcp://127.0.0.1:33273 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 15.33 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:46341 | Total threads: 2 |
Dashboard: http://127.0.0.1:33825/status | Memory: 3.83 GiB |
Nanny: tcp://127.0.0.1:33773 | |
Local directory: /tmp/dask-scratch-space/worker-wv_htung |
Worker: 1
Comm: tcp://127.0.0.1:36249 | Total threads: 2 |
Dashboard: http://127.0.0.1:46677/status | Memory: 3.83 GiB |
Nanny: tcp://127.0.0.1:44371 | |
Local directory: /tmp/dask-scratch-space/worker-hei21ixk |
Worker: 2
Comm: tcp://127.0.0.1:39179 | Total threads: 2 |
Dashboard: http://127.0.0.1:36605/status | Memory: 3.83 GiB |
Nanny: tcp://127.0.0.1:39871 | |
Local directory: /tmp/dask-scratch-space/worker-j_6k9zdk |
Worker: 3
Comm: tcp://127.0.0.1:41007 | Total threads: 2 |
Dashboard: http://127.0.0.1:33947/status | Memory: 3.83 GiB |
Nanny: tcp://127.0.0.1:46009 | |
Local directory: /tmp/dask-scratch-space/worker-wwokg7w0 |
The Client
instance can act like a normal parallel pool since it has a submit method and returns futures:
c.submit
<bound method Client.submit of <Client: 'tcp://127.0.0.1:33273' processes=4 threads=8, memory=15.33 GiB>>
Let’s define our test function and combos
:
def rad(x, y, n):
r = (x**n + y**n)**(1 / n)
if r == 0.0:
return 1
return np.sin(r) / r
combos = {
'x': np.linspace(-15, 15, 51),
'y': np.linspace(-15, 15, 51),
'n': [2, 4, 6],
}
r = xyz.Runner(rad, var_names='sinc2d')
Now we can run (or harvest
, or grow_missing
) our combos, supplying the Client
instance to the executor=
keyword:
r.run_combos(combos, executor=c)
100%|##########| 7803/7803 [00:23<00:00, 326.77it/s]
<xarray.Dataset> Dimensions: (x: 51, y: 51, n: 3) Coordinates: * x (x) float64 -15.0 -14.4 -13.8 -13.2 -12.6 ... 13.2 13.8 14.4 15.0 * y (y) float64 -15.0 -14.4 -13.8 -13.2 -12.6 ... 13.2 13.8 14.4 15.0 * n (n) int64 2 4 6 Data variables: sinc2d (x, y, n) float64 0.03308 -0.04752 -0.05369 ... -0.04752 -0.05369
That should take enough time to check out client status including the tasks being processed on for example: http://127.0.0.1:8787/tasks.
Finally, visualize the output:
r.last_ds.xyz.infiniplot('x', 'y', z='sinc2d', col='n', palette="turbo")
(<Figure size 900x300 with 3 Axes>,
array([[<Axes: xlabel='x', ylabel='y'>, <Axes: xlabel='x'>,
<Axes: xlabel='x'>]], dtype=object))