Persistent/Distributed Generation with Crop Example#

This example shows how to use the Crop object for disk-based combo running - either for persistent progress or distributed processing.

First let’s define a very simple function, describe it with a Runner and Harvester and set the combos for this first set of runs.

%config InlineBackend.figure_formats = ['svg']
import xyzpy as xyz


def foo(a, b):
    return a + b, a - b


r = xyz.Runner(foo, ['sum', 'diff'])
h = xyz.Harvester(r, data_name='foo_data.h5')

combos = {
    'a': range(0, 10),
    'b': range(0, 10),
}

We could use the harvester to generate data locally. But if we want results to be written to disk, either for persistence or to run them elsewhere, we need to create a Crop.

c = h.Crop(name='first_run', batchsize=5)
c
<Crop(name='first_run', batchsize=5, num_batches=20)>

Sow the combos#

A single crop is used for each set of runs/combos, with batchsize setting how many runs should be lumped together (default: 1). We first sow the combos to disk using the Crop:

c.sow_combos(combos)
100%|##########| 100/100 [00:00<00:00, 124128.56it/s]

There is now a hidden directory containing everything the crop needs:

!ls -a
 .				   dask-worker-space
 ..				  'farming example.ipynb'
'basic output example.ipynb'	   .ipynb_checkpoints
'complex output example.ipynb'	  'visualize linear algebra.ipynb'
'crop example.ipynb'		   .xyz-first_run
'dask distributed example.ipynb'

And inside that are folders for the batches and results, the pickled function, and some other dumped settings:

!ls .xyz-first_run/
batches  results  xyz-function.clpkl  xyz-settings.jbdmp

Once sown, we can check the progress of the Crop:

print(c)
/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
5 / 20 batches of size 5 completed
[#####               ] : 25.0%

There are a hundred combinations, with a batchsize of 5, yielding 20 batches to be processed.

Hint

As well as combos you can supply cases and constants to sow_combos().

Grow the results#

Any python process with access to the sown batches in .xyz-first_run (and the function requirements) can grow the results (you could even zip the folder up and send elsewhere). The process can be run in several ways:

  1. In the .xyz-first_run folder itself, using e.g:

python -c "import xyzpy; xyzpy.grow(i)"  # with i = 1 ... 20
  1. In the current (‘parent’) folder, one then has to used a named crop to differentiate: e.g:

python -c "import xyzpy; crop=xyzpy.Crop(name='fist_run'); xyzpy.grow(i, crop=crop)"
  1. Somewhere else. Then the parent must be specified too, e.g.:

python -c "import xyzpy; crop=xyzpy.Crop(name='first_run', parent_dir='.../xyzpy/docs/examples'); xyzpy.grow(i, crop=crop)"

To fake this happening we can run grow ourselves (this cell could standalone):

import xyzpy
crop = xyzpy.Crop(name='first_run')
for i in range(1, 11):
    xyzpy.grow(i, crop=crop)
xyzpy: loaded batch 1 of first_run.
{'a': 0, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3460.65it/s]
xyzpy: success - batch 1 completed.
xyzpy: loaded batch 2 of first_run.
{'a': 0, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4195.14it/s]
xyzpy: success - batch 2 completed.
xyzpy: loaded batch 3 of first_run.
{'a': 1, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 2392.64it/s]
xyzpy: success - batch 3 completed.
xyzpy: loaded batch 4 of first_run.
{'a': 1, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4371.80it/s]
xyzpy: success - batch 4 completed.
xyzpy: loaded batch 5 of first_run.
{'a': 2, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 4623.35it/s]
xyzpy: success - batch 5 completed.
xyzpy: loaded batch 6 of first_run.
{'a': 2, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 3317.75it/s]
xyzpy: success - batch 6 completed.
xyzpy: loaded batch 7 of first_run.
{'a': 3, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3296.37it/s]
xyzpy: success - batch 7 completed.
xyzpy: loaded batch 8 of first_run.
{'a': 3, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4092.00it/s]
xyzpy: success - batch 8 completed.
xyzpy: loaded batch 9 of first_run.
{'a': 4, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 2017.66it/s]
xyzpy: success - batch 9 completed.
xyzpy: loaded batch 10 of first_run.
{'a': 4, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4612.17it/s]
xyzpy: success - batch 10 completed.

And now we can check the progress:

print(c)
/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
10 / 20 batches of size 5 completed
[##########          ] : 50.0%

If we were on a batch system we could use xyzpy.Crop.grow_cluster() to automatically submit all missing batches as jobs. It is worth double checking the script that is used first though! This is done using xyzpy.Crop.gen_cluster_script():

print(c.gen_cluster_script(scheduler='sge', minutes=20, gigabytes=1))
#!/bin/bash -l
#$ -S /bin/bash
#$ -N first_run
#$ -l h_rt=0:20:0,mem=1G
#$ -l tmpfs=1G
mkdir -p /home/johnnie/Scratch/output
#$ -wd /home/johnnie/Scratch/output
#$ -pe smp None

#$ -t 1-10
echo 'XYZPY script starting...'
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=None
export MKL_NUM_THREADS=None
export OPENBLAS_NUM_THREADS=None
export NUMBA_NUM_THREADS=None

conda activate py311
read -r -d '' SCRIPT << EOM
#
from xyzpy.gen.cropping import grow, Crop
if __name__ == '__main__':
    crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
    print('Growing:', repr(crop))
    grow_kwargs = dict(crop=crop, debugging=False, num_workers=None)
    batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)]
    grow(batch_ids[$SGE_TASK_ID - 1], **grow_kwargs)
EOM
python -c "$SCRIPT"
echo 'XYZPY script finished'

The default scheduler is 'sge' (Sun Grid Engine), however you can also specify 'pbs' (Portable Batch System) or 'slurm':

print(c.gen_cluster_script(scheduler='pbs', minutes=20, gigabytes=1))
#!/bin/bash -l
#PBS -N first_run
#PBS -lselect=None:ncpus=None:mem=1gb
#PBS -lwalltime=00:20:00

#PBS -J 1-10
echo 'XYZPY script starting...'
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=None
export MKL_NUM_THREADS=None
export OPENBLAS_NUM_THREADS=None
export NUMBA_NUM_THREADS=None

conda activate py311
read -r -d '' SCRIPT << EOM
#
from xyzpy.gen.cropping import grow, Crop
if __name__ == '__main__':
    crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
    print('Growing:', repr(crop))
    grow_kwargs = dict(crop=crop, debugging=False, num_workers=None)
    batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    grow(batch_ids[$PBS_ARRAY_INDEX - 1], **grow_kwargs)
EOM
python -c "$SCRIPT"
echo 'XYZPY script finished'
print(c.gen_cluster_script(scheduler='slurm', minutes=20, gigabytes=1))
#!/bin/bash -l
#SBATCH --job-name=first_run
#SBATCH --time=00:20:00
#SBATCH --mem=1G
#SBATCH --array=1-10
echo 'XYZPY script starting...'
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=None
export MKL_NUM_THREADS=None
export OPENBLAS_NUM_THREADS=None
export NUMBA_NUM_THREADS=None

conda activate py311
read -r -d '' SCRIPT << EOM
#
from xyzpy.gen.cropping import grow, Crop
if __name__ == '__main__':
    crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
    print('Growing:', repr(crop))
    grow_kwargs = dict(crop=crop, debugging=False, num_workers=None)
    batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    grow(batch_ids[$SLURM_ARRAY_TASK_ID - 1], **grow_kwargs)
EOM
python -c "$SCRIPT"
echo 'XYZPY script finished'

If you are just using the Crop as a persistence mechanism, then xyzpy.Crop.grow() or xyzpy.Crop.grow_missing() will process the batches in the current process:

c.grow_missing(parallel=True)  #  this accepts combo_runner kwargs
100%|##########| 10/10 [00:00<00:00, 19.46it/s]
print(c)
/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
20 / 20 batches of size 5 completed
[####################] : 100.0%

Hint

If different function calls might take different amounts of time based on their arguments, you can supply shuffle=True to xyzpy.Crop.sow_combos(). Each batch will then be a random selection of cases, which should even out the effort each takes as long as batchsize is not too small.

Reap the results#

The final step is to ‘reap’ the results from disk. Because the crop was instantiated from a Harvester, that harvester will be automatically used to collect the resulting dataset and sync it with the on-disk dataset:

c.reap()
100%|##########| 100/100 [00:00<00:00, 193196.87it/s]
<xarray.Dataset>
Dimensions:  (a: 10, b: 10)
Coordinates:
  * a        (a) int64 0 1 2 3 4 5 6 7 8 9
  * b        (b) int64 0 1 2 3 4 5 6 7 8 9
Data variables:
    sum      (a, b) int64 0 1 2 3 4 5 6 7 8 9 1 ... 9 10 11 12 13 14 15 16 17 18
    diff     (a, b) int64 0 -1 -2 -3 -4 -5 -6 -7 -8 -9 1 ... 9 8 7 6 5 4 3 2 1 0

Hint

If the Crop is incomplete but has some results, you can call crop.reap(allow_incomplete=True) to harvest the existing data.

Hint

You can supply other kwargs related to harvesting such as overwrite=True, which is useful when you want to replace existing data with newer runs without starting over.

The dataset foo_data.h5 should be on disk, and the crop folder cleaned up:

!ls -a
 .				   dask-worker-space
 ..				  'farming example.ipynb'
'basic output example.ipynb'	   foo_data.h5
'complex output example.ipynb'	   .ipynb_checkpoints
'crop example.ipynb'		  'visualize linear algebra.ipynb'
'dask distributed example.ipynb'

And we can inspect the results:

h.full_ds.xyz.plot(x='a', y='b', z='diff');
../_images/73afdc7df2c31b70dadd8be77413556494d66024dd802a0e326ca17eda16cb3a.svg

Many crops can be created from the harvester at once, and when they are reaped, the results should be seamlessly combined into the on-disk dataset.

# for now clean up
h.delete_ds()