Persistent/Distributed Generation with Crop Example
Contents
Persistent/Distributed Generation with Crop
Example#
This example shows how to use the Crop
object for disk-based
combo running - either for persistent progress or distributed processing.
First let’s define a very simple function, describe it with a
Runner
and Harvester
and set the combos for
this first set of runs.
[1]:
import xyzpy as xyz
def foo(a, b):
return a + b, a - b
r = xyz.Runner(foo, ['sum', 'diff'])
h = xyz.Harvester(r, data_name='foo_data.h5')
combos = {'a': range(0, 10),
'b': range(0, 10)}
We could use the harvester to generate data locally. But if we want results to
be written to disk, either for persistence or to run them elsewhere, we need
to create a Crop
.
[2]:
c = h.Crop(name='first_run', batchsize=5)
c
[2]:
<Crop(name='first_run', progress=*reaped or unsown*, batchsize=5)>
Sow the combos#
A single crop is used for each set of runs/combos, with batchsize setting how many runs should be lumped together (default: 1). We first sow the combos
to disk using the Crop
:
[3]:
c.sow_combos(combos)
100%|##########| 100/100 [00:00<00:00, 57244.49it/s]
There is now a hidden directory containing everything the crop needs:
[4]:
ls -a
./ 'dask distributed example.ipynb'*
../ dask-worker-space/
'basic output example.ipynb'* 'farming example.ipynb'*
'complex output example.ipynb'* .ipynb_checkpoints/
'crop example.ipynb'* .xyz-first_run/
And inside that are folders for the batches and results, the pickled function, and some other dumped settings:
[5]:
ls .xyz-first_run/
batches/ results/ xyz-function.clpkl xyz-settings.jbdmp
Once sown, we can check the progress of the Crop
:
[6]:
c
[6]:
<Crop(name='first_run', progress=0/20, batchsize=5)>
There are a hundred combinations, with a batchsize of 5, yielding 20 batches to be processed.
Hint
As well as combos
you can supply cases
and constants
to
sow_combos()
.
Grow the results#
Any python process with access to the sown batches in .xyz-first_run
(and the function requirements) can grow the results (you could even zip the folder up and send elsewhere). The process can be run in several ways:
In the
.xyz-first_run
folder itself, using e.g:
python -c "import xyzpy; xyzpy.grow(i)" # with i = 1 ... 20
In the current (‘parent’) folder, one then has to used a named crop to differentiate: e.g:
python -c "import xyzpy; crop=xyzpy.Crop(name='fist_run'); xyzpy.grow(i, crop=crop)"
Somewhere else. Then the parent must be specified too, e.g.:
python -c "import xyzpy; crop=xyzpy.Crop(name='first_run', parent_dir='.../xyzpy/docs/examples'); xyzpy.grow(i, crop=crop)"
To fake this happening we can run grow
ourselves (this cell could standalone):
[7]:
import xyzpy
crop = xyzpy.Crop(name='first_run')
for i in range(1, 11):
xyzpy.grow(i, crop=crop)
Batch 1: {'a': 0, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3173.66it/s]
Batch 2: {'a': 0, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 3541.29it/s]
Batch 3: {'a': 1, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3522.85it/s]
Batch 4: {'a': 1, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 3885.05it/s]
Batch 5: {'a': 2, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 4481.09it/s]
Batch 6: {'a': 2, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4080.06it/s]
Batch 7: {'a': 3, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 4286.90it/s]
Batch 8: {'a': 3, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4597.00it/s]
Batch 9: {'a': 4, 'b': 4}: 100%|##########| 5/5 [00:00<00:00, 3534.13it/s]
Batch 10: {'a': 4, 'b': 9}: 100%|##########| 5/5 [00:00<00:00, 4688.47it/s]
And now we can check the progress:
[8]:
print(c)
/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
10 / 20 batches of size 5 completed
[########## ] : 50.0%
If we were on a batch system we could use xyzpy.Crop.grow_cluster()
to automatically
submit all missing batches as jobs. It is worth double checking the script that
is used first though! This is done using xyzpy.Crop.gen_cluster_script()
:
[9]:
print(c.gen_cluster_script(scheduler='sge', minutes=20, gigabytes=1))
#!/bin/bash -l
#$ -S /bin/bash
#$ -l h_rt=0:20:0,mem=1G
#$ -l tmpfs=1G
#$ -N first_run
mkdir -p /home/johnnie/Scratch/output
#$ -wd /home/johnnie/Scratch/output
#$ -pe smp 1
#$ -t 1-10
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
tmpfile=$(mktemp .xyzpy-qsub.XXXXXXXX)
cat <<EOF > $tmpfile
#
from xyzpy.gen.batch import grow, Crop
if __name__ == '__main__':
crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)]
grow(batch_ids[$SGE_TASK_ID - 1], crop=crop, debugging=False)
EOF
python $tmpfile
rm $tmpfile
The default scheduler
is 'sge'
(Sun Grid Engine), however you can also specify 'pbs'
(Portable Batch System) or 'slurm'
:
[10]:
print(c.gen_cluster_script(scheduler='pbs', minutes=20, gigabytes=1))
#!/bin/bash -l
#PBS -lselect=1:ncpus=1:mem=1gb
#PBS -lwalltime=00:20:00
#PBS -N first_run
#PBS -J 1-10
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
tmpfile=$(mktemp .xyzpy-qsub.XXXXXXXX)
cat <<EOF > $tmpfile
#
from xyzpy.gen.batch import grow, Crop
if __name__ == '__main__':
crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
grow(batch_ids[$PBS_ARRAY_INDEX - 1], crop=crop, debugging=False)
EOF
python $tmpfile
rm $tmpfile
[11]:
print(c.gen_cluster_script(scheduler='slurm', minutes=20, gigabytes=1))
#!/bin/bash -l
#SBATCH --nodes=1
#SBATCH --mem=1gb
#SBATCH --cpus-per-task=1
#SBATCH --time=00:20:00
#SBATCH --job-name=first_run
#SBATCH --array=1-10
cd /media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
tmpfile=$(mktemp .xyzpy-qsub.XXXXXXXX)
cat <<EOF > $tmpfile
#
from xyzpy.gen.batch import grow, Crop
if __name__ == '__main__':
crop = Crop(name='first_run', parent_dir='/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples')
batch_ids = (11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
grow(batch_ids[$SLURM_ARRAY_TASK_ID - 1], crop=crop, debugging=False)
EOF
python $tmpfile
rm $tmpfile
If you are just using the Crop
as a persistence mechanism,
then xyzpy.Crop.grow()
or xyzpy.Crop.grow_missing()
will process the batches in the current process:
[12]:
c.grow_missing(parallel=True) # this accepts combo_runner kwargs
100%|##########| 10/10 [00:00<00:00, 10.66it/s]
[13]:
print(c)
/media/johnnie/Storage2TB/Sync/dev/python/xyzpy/docs/examples/.xyz-first_run
-------------------------------------------------------------------=========
20 / 20 batches of size 5 completed
[####################] : 100.0%
Hint
If different function calls might take different amounts of time based on their arguments,
you can supply shuffle=True
to xyzpy.Crop.sow_combos()
. Each batch will then
be a random selection of cases, which should even out the effort each takes as long as
batchsize
is not too small.
Reap the results#
The final step is to ‘reap’ the results from disk. Because the crop was instantiated from a Harvester
, that harvester will be automatically used to collect the resulting dataset and sync it with the on-disk dataset:
[14]:
c.reap()
100%|##########| 100/100 [00:00<00:00, 89013.24it/s]
[14]:
<xarray.Dataset> Dimensions: (a: 10, b: 10) Coordinates: * a (a) int64 0 1 2 3 4 5 6 7 8 9 * b (b) int64 0 1 2 3 4 5 6 7 8 9 Data variables: sum (a, b) int64 0 1 2 3 4 5 6 7 8 9 1 ... 9 10 11 12 13 14 15 16 17 18 diff (a, b) int64 0 -1 -2 -3 -4 -5 -6 -7 -8 -9 1 ... 9 8 7 6 5 4 3 2 1 0
- a: 10
- b: 10
- a(a)int640 1 2 3 4 5 6 7 8 9
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
- b(b)int640 1 2 3 4 5 6 7 8 9
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
- sum(a, b)int640 1 2 3 4 5 6 ... 13 14 15 16 17 18
array([[ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [ 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], [ 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], [ 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], [ 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], [ 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], [ 7, 8, 9, 10, 11, 12, 13, 14, 15, 16], [ 8, 9, 10, 11, 12, 13, 14, 15, 16, 17], [ 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]])
- diff(a, b)int640 -1 -2 -3 -4 -5 -6 ... 5 4 3 2 1 0
array([[ 0, -1, -2, -3, -4, -5, -6, -7, -8, -9], [ 1, 0, -1, -2, -3, -4, -5, -6, -7, -8], [ 2, 1, 0, -1, -2, -3, -4, -5, -6, -7], [ 3, 2, 1, 0, -1, -2, -3, -4, -5, -6], [ 4, 3, 2, 1, 0, -1, -2, -3, -4, -5], [ 5, 4, 3, 2, 1, 0, -1, -2, -3, -4], [ 6, 5, 4, 3, 2, 1, 0, -1, -2, -3], [ 7, 6, 5, 4, 3, 2, 1, 0, -1, -2], [ 8, 7, 6, 5, 4, 3, 2, 1, 0, -1], [ 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]])
The dataset foo_data.h5
should be on disk, and the crop folder cleaned up:
[15]:
ls -a
./ 'dask distributed example.ipynb'*
../ dask-worker-space/
'basic output example.ipynb'* 'farming example.ipynb'*
'complex output example.ipynb'* foo_data.h5
'crop example.ipynb'* .ipynb_checkpoints/
And we can inspect the results:
[16]:
h.full_ds.xyz.iheatmap('a', 'b', 'diff')