Persistent/Distributed Generation with Crop
Example¶
This example shows how to use the Crop
object for disk-based
combo running - either for persistent progress or distributed processing.
First let’s define a very simple function, describe it with a
Runner
and Harvester
and set the combos for
this first set of runs.
%config InlineBackend.figure_formats = ['svg']
import xyzpy as xyz
def foo(a, b):
return a + b, a - b
r = xyz.Runner(foo, ['sum', 'diff'])
h = xyz.Harvester(r, data_name='foo_data.h5')
combos = {
'a': range(0, 10),
'b': range(0, 10),
}
We could use the harvester to generate data locally. But if we want results to
be written to disk, either for persistence or to run them elsewhere, we need
to create a Crop
.
c = h.Crop(name='first_run', batchsize=5)
c
<Crop(name='first_run', batchsize=5, num_batches=20)>
Sow the combos¶
A single crop is used for each set of runs/combos, with batchsize setting how many runs should be lumped together (default: 1).
We first sow the combos
to disk using the Crop
:
c.sow_combos(combos)
100%|##########| 100/100 [00:00<00:00, 124128.56it/s]
There is now a hidden directory containing everything the crop needs:
!ls -a
. dask-worker-space
.. 'farming example.ipynb'
'basic output example.ipynb' .ipynb_checkpoints
'complex output example.ipynb' 'visualize linear algebra.ipynb'
'crop example.ipynb' .xyz-first_run
'dask distributed example.ipynb'
And inside that are folders for the batches and results, the pickled function, and some other dumped settings:
!ls .xyz-first_run/
batches results xyz-function.clpkl xyz-settings.jbdmp
Once sown, we can check the progress of the Crop
:
print(c)
/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
5 / 20 batches of size 5 completed
[##### ] : 25.0%
There are a hundred combinations, with a batchsize of 5, yielding 20 batches to be processed.
Hint
As well as combos
you can supply cases
and constants
to
sow_combos()
.
Grow the results¶
Any python process with access to the sown batches in .xyz-first_run
(and the function requirements) can grow the results (you could even zip the folder up and send elsewhere). The process can be run in several ways:
In the
.xyz-first_run
folder itself, using e.g:
python -c "import xyzpy; xyzpy.grow(i)" # with i = 1 ... 20
In the current (‘parent’) folder, one then has to used a named crop to differentiate: e.g:
python -c "import xyzpy; crop=xyzpy.Crop(name='fist_run'); xyzpy.grow(i, crop=crop)"
Somewhere else. Then the parent must be specified too, e.g.:
python -c "import xyzpy; crop=xyzpy.Crop(name='first_run', parent_dir='.../xyzpy/docs/examples'); xyzpy.grow(i, crop=crop)"
To fake this happening we can run grow
ourselves (this cell could standalone):
import xyzpy
crop = xyzpy.Crop(name='first_run')
for i in range(1, 11):
xyzpy.grow(i, crop=crop)
xyzpy: loaded batch 1 of first_run.
{'a': 0, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3460.65it/s]
xyzpy: success - batch 1 completed.
xyzpy: loaded batch 2 of first_run.
{'a': 0, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4195.14it/s]
xyzpy: success - batch 2 completed.
xyzpy: loaded batch 3 of first_run.
{'a': 1, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 2392.64it/s]
xyzpy: success - batch 3 completed.
xyzpy: loaded batch 4 of first_run.
{'a': 1, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4371.80it/s]
xyzpy: success - batch 4 completed.
xyzpy: loaded batch 5 of first_run.
{'a': 2, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 4623.35it/s]
xyzpy: success - batch 5 completed.
xyzpy: loaded batch 6 of first_run.
{'a': 2, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 3317.75it/s]
xyzpy: success - batch 6 completed.
xyzpy: loaded batch 7 of first_run.
{'a': 3, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3296.37it/s]
xyzpy: success - batch 7 completed.
xyzpy: loaded batch 8 of first_run.
{'a': 3, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4092.00it/s]
xyzpy: success - batch 8 completed.
xyzpy: loaded batch 9 of first_run.
{'a': 4, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 2017.66it/s]
xyzpy: success - batch 9 completed.
xyzpy: loaded batch 10 of first_run.
{'a': 4, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4612.17it/s]
xyzpy: success - batch 10 completed.
And now we can check the progress:
print(c)
/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
10 / 20 batches of size 5 completed
[########## ] : 50.0%
If we were on a batch system we could use xyzpy.Crop.grow_cluster()
to automatically
submit all missing batches as jobs. It is worth double checking the script that
is used first though! This is done using xyzpy.Crop.gen_cluster_script()
:
print(c.gen_cluster_script(scheduler='sge', minutes=20, gigabytes=1))
#!/bin/bash -l
#$ -S /bin/bash
#$ -N first_run
#$ -l h_rt=0:20:0,mem=1G
#$ -l tmpfs=1G
mkdir -p /home/johnnie/Scratch/output
#$ -wd /home/johnnie/Scratch/output
#$ -pe smp None
#$ -t 1-10
echo 'XYZPY script starting...'
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=None
export MKL_NUM_THREADS=None
export OPENBLAS_NUM_THREADS=None
export NUMBA_NUM_THREADS=None
conda activate py311
read -r -d '' SCRIPT << EOM
#
from xyzpy.gen.cropping import grow, Crop
if __name__ == '__main__':
crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
print('Growing:', repr(crop))
grow_kwargs = dict(crop=crop, debugging=False, num_workers=None)
batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)]
grow(batch_ids[$SGE_TASK_ID - 1], **grow_kwargs)
EOM
python -c "$SCRIPT"
echo 'XYZPY script finished'
The default scheduler
is 'sge'
(Sun Grid Engine),
however you can also specify 'pbs'
(Portable Batch System)
or 'slurm'
:
print(c.gen_cluster_script(scheduler='pbs', minutes=20, gigabytes=1))
#!/bin/bash -l
#PBS -N first_run
#PBS -lselect=None:ncpus=None:mem=1gb
#PBS -lwalltime=00:20:00
#PBS -J 1-10
echo 'XYZPY script starting...'
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=None
export MKL_NUM_THREADS=None
export OPENBLAS_NUM_THREADS=None
export NUMBA_NUM_THREADS=None
conda activate py311
read -r -d '' SCRIPT << EOM
#
from xyzpy.gen.cropping import grow, Crop
if __name__ == '__main__':
crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
print('Growing:', repr(crop))
grow_kwargs = dict(crop=crop, debugging=False, num_workers=None)
batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
grow(batch_ids[$PBS_ARRAY_INDEX - 1], **grow_kwargs)
EOM
python -c "$SCRIPT"
echo 'XYZPY script finished'
print(c.gen_cluster_script(scheduler='slurm', minutes=20, gigabytes=1))
#!/bin/bash -l
#SBATCH --job-name=first_run
#SBATCH --time=00:20:00
#SBATCH --mem=1G
#SBATCH --array=1-10
echo 'XYZPY script starting...'
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=None
export MKL_NUM_THREADS=None
export OPENBLAS_NUM_THREADS=None
export NUMBA_NUM_THREADS=None
conda activate py311
read -r -d '' SCRIPT << EOM
#
from xyzpy.gen.cropping import grow, Crop
if __name__ == '__main__':
crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
print('Growing:', repr(crop))
grow_kwargs = dict(crop=crop, debugging=False, num_workers=None)
batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
grow(batch_ids[$SLURM_ARRAY_TASK_ID - 1], **grow_kwargs)
EOM
python -c "$SCRIPT"
echo 'XYZPY script finished'
If you are just using the Crop
as a persistence mechanism,
then xyzpy.Crop.grow()
or xyzpy.Crop.grow_missing()
will process the batches in the current process:
c.grow_missing(parallel=True) # this accepts combo_runner kwargs
100%|##########| 10/10 [00:00<00:00, 19.46it/s]
print(c)
/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
20 / 20 batches of size 5 completed
[####################] : 100.0%
Hint
If different function calls might take different amounts of time based on their arguments,
you can supply shuffle=True
to xyzpy.Crop.sow_combos()
. Each batch will then
be a random selection of cases, which should even out the effort each takes as long as
batchsize
is not too small.
Reap the results¶
The final step is to ‘reap’ the results from disk. Because the crop was instantiated from a Harvester
, that harvester will be automatically used to collect the resulting dataset and sync it with the on-disk dataset:
c.reap()
100%|##########| 100/100 [00:00<00:00, 193196.87it/s]
<xarray.Dataset> Dimensions: (a: 10, b: 10) Coordinates: * a (a) int64 0 1 2 3 4 5 6 7 8 9 * b (b) int64 0 1 2 3 4 5 6 7 8 9 Data variables: sum (a, b) int64 0 1 2 3 4 5 6 7 8 9 1 ... 9 10 11 12 13 14 15 16 17 18 diff (a, b) int64 0 -1 -2 -3 -4 -5 -6 -7 -8 -9 1 ... 9 8 7 6 5 4 3 2 1 0
Hint
If the Crop
is incomplete but has some results, you can call crop.reap(allow_incomplete=True)
to harvest the existing data.
Hint
You can supply other kwargs related to harvesting such as overwrite=True
, which is useful when you want to replace existing data with newer runs without starting over.
The dataset foo_data.h5
should be on disk, and the crop folder cleaned up:
!ls -a
. dask-worker-space
.. 'farming example.ipynb'
'basic output example.ipynb' foo_data.h5
'complex output example.ipynb' .ipynb_checkpoints
'crop example.ipynb' 'visualize linear algebra.ipynb'
'dask distributed example.ipynb'
And we can inspect the results:
h.full_ds.xyz.plot(x='a', y='b', z='diff');
Many crops can be created from the harvester at once, and when they are reaped, the results should be seamlessly combined into the on-disk dataset.
# for now clean up
h.delete_ds()