Persistent/Distributed Generation with Crop Example#

This example shows how to use the Crop object for disk-based combo running - either for persistent progress or distributed processing.

First let’s define a very simple function, describe it with a Runner and Harvester and set the combos for this first set of runs.

[1]:
import xyzpy as xyz

def foo(a, b):
    return a + b, a - b

r = xyz.Runner(foo, ['sum', 'diff'])
h = xyz.Harvester(r, data_name='foo_data.h5')

combos = {'a': range(0, 10),
          'b': range(0, 10)}

We could use the harvester to generate data locally. But if we want results to be written to disk, either for persistence or to run them elsewhere, we need to create a Crop.

[2]:
c = h.Crop(name='first_run', batchsize=5)
c
[2]:
<Crop(name='first_run', progress=*reaped or unsown*, batchsize=5)>

Sow the combos#

A single crop is used for each set of runs/combos, with batchsize setting how many runs should be lumped together (default: 1). We first sow the combos to disk using the Crop:

[3]:
c.sow_combos(combos)
100%|##########| 100/100 [00:00<00:00, 57244.49it/s]

There is now a hidden directory containing everything the crop needs:

[4]:
ls -a
 ./                              'dask distributed example.ipynb'*
 ../                              dask-worker-space/
'basic output example.ipynb'*    'farming example.ipynb'*
'complex output example.ipynb'*   .ipynb_checkpoints/
'crop example.ipynb'*             .xyz-first_run/

And inside that are folders for the batches and results, the pickled function, and some other dumped settings:

[5]:
ls .xyz-first_run/
batches/  results/  xyz-function.clpkl  xyz-settings.jbdmp

Once sown, we can check the progress of the Crop:

[6]:
c
[6]:
<Crop(name='first_run', progress=0/20, batchsize=5)>

There are a hundred combinations, with a batchsize of 5, yielding 20 batches to be processed.

Hint

As well as combos you can supply cases and constants to sow_combos().

Grow the results#

Any python process with access to the sown batches in .xyz-first_run (and the function requirements) can grow the results (you could even zip the folder up and send elsewhere). The process can be run in several ways:

  1. In the .xyz-first_run folder itself, using e.g:

python -c "import xyzpy; xyzpy.grow(i)"  # with i = 1 ... 20
  1. In the current (‘parent’) folder, one then has to used a named crop to differentiate: e.g:

python -c "import xyzpy; crop=xyzpy.Crop(name='fist_run'); xyzpy.grow(i, crop=crop)"
  1. Somewhere else. Then the parent must be specified too, e.g.:

python -c "import xyzpy; crop=xyzpy.Crop(name='first_run', parent_dir='.../xyzpy/docs/examples'); xyzpy.grow(i, crop=crop)"

To fake this happening we can run grow ourselves (this cell could standalone):

[7]:
import xyzpy
crop = xyzpy.Crop(name='first_run')
for i in range(1, 11):
    xyzpy.grow(i, crop=crop)
Batch 1: {'a': 0, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3173.66it/s]
Batch 2: {'a': 0, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 3541.29it/s]
Batch 3: {'a': 1, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3522.85it/s]
Batch 4: {'a': 1, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 3885.05it/s]
Batch 5: {'a': 2, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 4481.09it/s]
Batch 6: {'a': 2, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4080.06it/s]
Batch 7: {'a': 3, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 4286.90it/s]
Batch 8: {'a': 3, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4597.00it/s]
Batch 9: {'a': 4, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3534.13it/s]
Batch 10: {'a': 4, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4688.47it/s]

And now we can check the progress:

[8]:
print(c)

/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
10 / 20 batches of size 5 completed
[##########          ] : 50.0%

If we were on a batch system we could use xyzpy.Crop.grow_cluster() to automatically submit all missing batches as jobs. It is worth double checking the script that is used first though! This is done using xyzpy.Crop.gen_cluster_script():

[9]:
print(c.gen_cluster_script(scheduler='sge', minutes=20, gigabytes=1))
#!/bin/bash -l
#$ -S /bin/bash
#$ -l h_rt=0:20:0,mem=1G
#$ -l tmpfs=1G

#$ -N first_run
mkdir -p /home/johnnie/Scratch/output
#$ -wd /home/johnnie/Scratch/output
#$ -pe smp 1
#$ -t 1-10
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

tmpfile=$(mktemp .xyzpy-qsub.XXXXXXXX)
cat <<EOF > $tmpfile
#
from xyzpy.gen.batch import grow, Crop
if __name__ == '__main__':
    crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
    batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)]
    grow(batch_ids[$SGE_TASK_ID - 1], crop=crop, debugging=False)
EOF
python $tmpfile
rm $tmpfile

The default scheduler is 'sge' (Sun Grid Engine), however you can also specify 'pbs' (Portable Batch System) or 'slurm':

[10]:
print(c.gen_cluster_script(scheduler='pbs', minutes=20, gigabytes=1))
#!/bin/bash -l
#PBS -lselect=1:ncpus=1:mem=1gb
#PBS -lwalltime=00:20:00

#PBS -N first_run
#PBS -J 1-10
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

tmpfile=$(mktemp .xyzpy-qsub.XXXXXXXX)
cat <<EOF > $tmpfile
#
from xyzpy.gen.batch import grow, Crop
if __name__ == '__main__':
    crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
    batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    grow(batch_ids[$PBS_ARRAY_INDEX - 1], crop=crop, debugging=False)
EOF
python $tmpfile
rm $tmpfile

[11]:
print(c.gen_cluster_script(scheduler='slurm', minutes=20, gigabytes=1))
#!/bin/bash -l
#SBATCH --nodes=1
#SBATCH --mem=1gb
#SBATCH --cpus-per-task=1
#SBATCH --time=00:20:00

#SBATCH --job-name=first_run
#SBATCH --array=1-10
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

tmpfile=$(mktemp .xyzpy-qsub.XXXXXXXX)
cat <<EOF > $tmpfile
#
from xyzpy.gen.batch import grow, Crop
if __name__ == '__main__':
    crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
    batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
    grow(batch_ids[$SLURM_ARRAY_TASK_ID - 1], crop=crop, debugging=False)
EOF
python $tmpfile
rm $tmpfile

If you are just using the Crop as a persistence mechanism, then xyzpy.Crop.grow() or xyzpy.Crop.grow_missing() will process the batches in the current process:

[12]:
c.grow_missing(parallel=True)  #  this accepts combo_runner kwargs
100%|##########| 10/10 [00:00<00:00, 10.66it/s]
[13]:
print(c)

/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
20 / 20 batches of size 5 completed
[####################] : 100.0%

Hint

If different function calls might take different amounts of time based on their arguments, you can supply shuffle=True to xyzpy.Crop.sow_combos(). Each batch will then be a random selection of cases, which should even out the effort each takes as long as batchsize is not too small.

Reap the results#

The final step is to ‘reap’ the results from disk. Because the crop was instantiated from a Harvester, that harvester will be automatically used to collect the resulting dataset and sync it with the on-disk dataset:

[14]:
c.reap()
100%|##########| 100/100 [00:00<00:00, 89013.24it/s]
[14]:
<xarray.Dataset>
Dimensions:  (a: 10, b: 10)
Coordinates:
  * a        (a) int64 0 1 2 3 4 5 6 7 8 9
  * b        (b) int64 0 1 2 3 4 5 6 7 8 9
Data variables:
    sum      (a, b) int64 0 1 2 3 4 5 6 7 8 9 1 ... 9 10 11 12 13 14 15 16 17 18
    diff     (a, b) int64 0 -1 -2 -3 -4 -5 -6 -7 -8 -9 1 ... 9 8 7 6 5 4 3 2 1 0

The dataset foo_data.h5 should be on disk, and the crop folder cleaned up:

[15]:
ls -a
 ./                              'dask distributed example.ipynb'*
 ../                              dask-worker-space/
'basic output example.ipynb'*    'farming example.ipynb'*
'complex output example.ipynb'*   foo_data.h5
'crop example.ipynb'*             .ipynb_checkpoints/

And we can inspect the results:

[16]:
h.full_ds.xyz.iheatmap('a', 'b', 'diff')
Loading BokehJS ...