Using dask.distributed as Parallel Pool Example#

This example shows how you can use any parallel pool-executor like object to run combos - specifally the dask.distributed.Client.

[1]:
import xyzpy as xyz
import numpy as np
from dask.distributed import Client

First we instantiate the client:

[2]:
address = None  # put in an actual address like '127.0.0.1:8786' if you have a scheduler running
c = Client(address=address)
c
[2]:

Client

Cluster

  • Workers: 4
  • Cores: 8
  • Memory: 16.49 GB

The Client instance can act like a normal parallel pool since it has a submit method and returns futures:

[3]:
c.submit
[3]:
<bound method Client.submit of <Client: 'tcp://127.0.0.1:43127' processes=4 threads=8, memory=16.49 GB>>

Let’s define our test function and combos:

[4]:
def rad(x, y, n):
    r = (x**n + y**n)**(1 / n)
    if r == 0.0:
        return 1
    return np.sin(r) / r

combos = {
    'x': np.linspace(-15, 15, 51),
    'y': np.linspace(-15, 15, 51),
    'n': [2, 4, 6],
}

r = xyz.Runner(rad, var_names='sinc2d')

Now we can run (or harvest, or grow_missing) our combos, supplying the Client instance to the executor= keyword:

[5]:
r.run_combos(combos, executor=c)
100%|##########| 7803/7803 [00:11<00:00, 686.28it/s]
[5]:
<xarray.Dataset>
Dimensions:  (n: 3, x: 51, y: 51)
Coordinates:
  * x        (x) float64 -15.0 -14.4 -13.8 -13.2 -12.6 ... 13.2 13.8 14.4 15.0
  * y        (y) float64 -15.0 -14.4 -13.8 -13.2 -12.6 ... 13.2 13.8 14.4 15.0
  * n        (n) int64 2 4 6
Data variables:
    sinc2d   (x, y, n) float64 0.03308 -0.04752 -0.05369 ... -0.04752 -0.05369

That should take enough time to check out client status including the tasks being processed on for example: http://127.0.0.1:8787/tasks.

Finally, visualize the output:

[6]:
r.last_ds.xyz.iheatmap('x', 'y', 'sinc2d', col='n')
Loading BokehJS ...