Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resample and boolean indexing with dask-arrays #356

Open
rpnaut opened this issue Nov 2, 2021 · 3 comments
Open

resample and boolean indexing with dask-arrays #356

rpnaut opened this issue Nov 2, 2021 · 3 comments
Labels
question Further information is requested

Comments

@rpnaut
Copy link

rpnaut commented Nov 2, 2021

I want to use the resample_iterations_idx functionality to bootstrap evaluation metrics of hindcasts. The challenge with huge datasets is the memory allocation when storing all the iteration samples.
I started with the Mean squared error skill score and tried to understand the memory consumption. The following small example script demonstrates the usage of the metric and the memory consumption:

import numpy
import xskillscore as xskill
import xarray as xr
from memory_profiler import profile

@profile
def AMSEss():
  alpha=0.10
  coordtime="time"
  dimbs="iteration"
  dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
  dsref   = xr.DataArray(data=15 + 0.15 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
  dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])

  bsp1 = xskill.resample_iterations_idx( dsproof - dsref, iterations=500, dim=coordtime,replace=True)
  bsp2 = xskill.resample_iterations_idx( dsskill - dsref, iterations=500, dim=coordtime,replace=True)

  p2divp1 = ( numpy.square( bsp2 ) ).mean(dim=coordtime) / \
    ( numpy.square( bsp1 ) ).mean(dim=coordtime)
  amsessf = xr.where( (p2divp1 - 1.0)>0, (-1.0)/p2divp1 + 1.0,
    p2divp1 - 1.0 ) # could be that nan's are not preserved
  amsesslb = amsessf.quantile(alpha/2.0,dim=dimbs) #, keep_attrs=True)
  amsessub = amsessf.quantile(1.0-alpha/2.0,dim=dimbs) #, keep_attrs=True)
  del bsp1, bsp2, amsessf
  BSms = xr.where( (amsesslb)>0, amsesslb*0.0 +1.0, amsesslb*0.0 )
  BSms = BSms.where( (amsessub)>0, -1.0 )
  BSms = BSms.where( (1-(amsesslb<=0)) == (1-(amsessub<=0)), 0.0 )
  BSms = BSms.where( amsesslb.notnull().data )
  BSms.to_netcdf("bsms_chunked.nc")

if __name__ == '__main__':
  AMSEss()

Running the script from the linux console gives me the following:

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     7    189.4 MiB    189.4 MiB           1   @profile
     8                                         def AMSEss():
     9    189.4 MiB      0.0 MiB           1     alpha=0.10
    10    189.4 MiB      0.0 MiB           1     coordtime="time"
    11    189.4 MiB      0.0 MiB           1     dimbs="iteration"
    12    201.0 MiB     11.6 MiB           1     dsskill = xr.DataArray(data=15 + 20 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
    13    212.5 MiB     11.4 MiB           1     dsref   = xr.DataArray(data=15 + 15 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
    14    223.9 MiB     11.4 MiB           1     dsproof = xr.DataArray(data=15 + 10 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"])
    15                                         
    16   5946.6 MiB   5722.7 MiB           1     bsp1 = xskill.resample_iterations_idx( dsproof - dsref, iterations=500, dim=coordtime,replace=True)
    17  11680.0 MiB   5733.4 MiB           1     bsp2 = xskill.resample_iterations_idx( dsskill - dsref, iterations=500, dim=coordtime,replace=True)
    18                                         
    19  11718.2 MiB      0.0 MiB           2     p2divp1 = ( numpy.square( bsp2 ) ).mean(dim=coordtime) / \
    20  11756.3 MiB     38.1 MiB           1       ( numpy.square( bsp1 ) ).mean(dim=coordtime)
    21  11756.4 MiB      0.0 MiB           2     amsessf = xr.where( (p2divp1 - 1.0)>0, (-1.0)/p2divp1 + 1.0, 
    22  11794.5 MiB     38.1 MiB           1       p2divp1 - 1.0 ) # could be that nan's are not preserved
    23  11756.4 MiB      0.0 MiB           1     amsesslb = amsessf.quantile(alpha/2.0,dim=dimbs) #, keep_attrs=True)
    24  11756.4 MiB      0.0 MiB           1     amsessub = amsessf.quantile(1.0-alpha/2.0,dim=dimbs) #, keep_attrs=True)
    25    274.2 MiB -11482.2 MiB           1     del bsp1, bsp2, amsessf
    26    274.2 MiB      0.0 MiB           1     BSms = xr.where( (amsesslb)>0, amsesslb*0.0 +1.0, amsesslb*0.0 )
    27    274.2 MiB      0.0 MiB           1     BSms = BSms.where( (amsessub)>0, -1.0 )
    28    274.2 MiB      0.0 MiB           1     BSms = BSms.where( (1-(amsesslb<=0)) == (1-(amsessub<=0)), 0.0 )
    29    274.2 MiB      0.0 MiB           1     BSms = BSms.where( amsesslb.notnull().data )

Thus, we need 5.7GB to store each of the iteration samples bsp1 and bsp2. That is consistent to the size of the arrays. However, often the climate datasets are much larger. So, I started working with dask arrays and changed the three lines:

  dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"]).chunk({'time':10})
  dsref   = xr.DataArray(data=15 + 0.15 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"]).chunk({'time':10})
  dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(100, 100, 150),dims=["x", "y", "time"]).chunk({'time':10})

Now, the dask scheduler starts to collect all operations and performs the computation as the netcdf-file is written. However, the resampling_iterations_idx seems to refer indices which belong to the unchunked fields but not to the chunked fields:

  File "/sw/spack-rhel6/miniforge3-4.9.2-3-Linux-x86_64-pwdbqi/lib/python3.8/site-packages/dask/optimization.py", line 963, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/sw/spack-rhel6/miniforge3-4.9.2-3-Linux-x86_64-pwdbqi/lib/python3.8/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/sw/spack-rhel6/miniforge3-4.9.2-3-Linux-x86_64-pwdbqi/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/zmaw/m221054/.local/lib/python3.8/site-packages/xskillscore/core/resampling.py", line 193, in select_bootstrap_indices_ufunc
    return np.moveaxis(x.squeeze()[idx.squeeze().transpose()], 0, -1)
IndexError: index 144 is out of bounds for axis 0 with size 10

Is there a way, to use the resampling functionality on dask arrays to save memory.? It is not clear to me if this is really parallelizable. As already commented in issue #221 by @dougiesquire (#221 (comment)), there is a problem with boolean indexing in numpy arrays which utilize dask arrays.

@aaronspring
Copy link
Collaborator

Is there a way, to use the resampling functionality on dask arrays to save memory.?

You can try resampling_iterations instead of resampling_iterations. This function doesn’t use vectorised indexing and is more robust on very large datasets in my experience

@rpnaut
Copy link
Author

rpnaut commented Nov 3, 2021

I am so stupid. Maybe, I found two solutions for this problem. I increased the size of the arrays to 200200150.

Solution A: Prior resampling process, I can remove the chunks from the dask arrays using chunk({'time':-1}). However, this brings me back to the problem of memory usage.

Solution B: I remembered that the sign-tests xskill.halfwidth_ci_test also contradicts with dask arrays if the dimension, which undergoes a correlation, carries multiple chunks. Thus, I prevent chunking the dimension which is part of the resampling, i.e.:

  dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
  dsref   = xr.DataArray(data=15 + 0.15 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
  dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})

And the memory statistics reads:

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     7    189.4 MiB    189.4 MiB           1   @profile
     8                                         def AMSEss():
     9    189.4 MiB      0.0 MiB           1     alpha=0.10
    10    189.4 MiB      0.0 MiB           1     coordtime="time"
    11    189.4 MiB      0.0 MiB           1     dimbs="iteration"
    12    235.3 MiB     45.9 MiB           1     dsskill = xr.DataArray(data=15 + 2.1 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
    13    281.1 MiB     45.8 MiB           1     dsref   = xr.DataArray(data=15 + 0.15 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
    14    326.9 MiB     45.8 MiB           1     dsproof = xr.DataArray(data=15 + 2.0 * numpy.random.randn(200, 200, 150),dims=["x", "y", "time"]).chunk({'x':10,'y':20})
    15                                         
    16    327.6 MiB      0.7 MiB           1     bsp1 = xskill.resample_iterations_idx( dsproof - dsref, iterations=500, dim=coordtime,replace=True)
    17    328.2 MiB      0.6 MiB           1     bsp2 = xskill.resample_iterations_idx( dsskill - dsref, iterations=500, dim=coordtime,replace=True)
    18                                         
    19    328.2 MiB      0.0 MiB           2     p2divp1 = ( numpy.square( bsp2 ) ).mean(dim=coordtime) / \
    20    328.2 MiB      0.0 MiB           1       ( numpy.square( bsp1 ) ).mean(dim=coordtime)
    21    328.7 MiB      0.4 MiB           2     amsessf = xr.where( (p2divp1 - 1.0)>0, (-1.0)/p2divp1 + 1.0, 
    22    328.6 MiB      0.0 MiB           1       p2divp1 - 1.0 ) # could be that nan's are not preserved
    23    328.7 MiB      0.0 MiB           1     amsesslb = amsessf.quantile(alpha/2.0,dim=dimbs) #, keep_attrs=True)
    24    328.7 MiB      0.0 MiB           1     amsessub = amsessf.quantile(1.0-alpha/2.0,dim=dimbs) #, keep_attrs=True)
    25    328.7 MiB      0.0 MiB           1     del bsp1, bsp2, amsessf
    26    328.7 MiB      0.0 MiB           1     BSms = xr.where( (amsesslb)>0, amsesslb*0.0 +1.0, amsesslb*0.0 )
    27    328.7 MiB      0.0 MiB           1     BSms = BSms.where( (amsessub)>0, -1.0 )
    28    329.4 MiB      0.7 MiB           1     BSms = BSms.where( (1-(amsesslb<=0)) == (1-(amsessub<=0)), 0.0 )
    29    329.5 MiB      0.1 MiB           1     BSms = BSms.where( amsesslb.notnull().data )
    30    329.5 MiB      0.0 MiB           1     print(BSms)
    31    416.4 MiB     86.9 MiB           1     BSms.to_netcdf("bsms_chunked")

The robustness of the memory consumption measurement is not clear. However, I decided to track also child processes during the execution of the bootstrapping code: mprof --include-children and got the following graphical results for usual xarray/numpy arrays:
https://1drv.ms/u/s!AktxgZXfBUm6tgmS7cEjl56RSsFW?e=IciPRG
and for xarray/dask arrays:
https://1drv.ms/u/s!AktxgZXfBUm6tgpu67yUeAFZt0SX?e=11hHEU

There is still a memory peak of 8GB using dask (it is hard to get the reason for it using this tracking, but maybe the resampling of the indices requires to group all the small chunks to larger chunks). However, the dask-arrays a) prevent for allocation of nearly 50GB for bsp1 and bsp2 and b) prevent a memory warning on the login-nodes of our cluster (the short-term memory peak seems to be not a problem).

Nevertheless, I would like to further reduce memory allocation and prevent from applying the resampling process twice, i.e. bsp1 and bsp2. I would like to have the indices of each of the 500 iteration samples. And this indices I would like to apply to the metric, e.g.

p2divp1 = ( numpy.square( forecast1[rand_ind] - observations[rand_ind] ) ).mean(dim=coordtime) / \
    ( numpy.square(  forecast2[rand_ind] - observations[rand_ind] ) ).mean(dim=coordtime)

where rand_ind contains indices from 500 iteration samples, where each sample has randomly chosen (with replacement) time steps from the time series at location x and y.

Is there a small code example, which would illustrate how to realize this?

@aaronspring
Copy link
Collaborator

I would start from rebuilding resample_iterations(_idx) manually

@aaronspring aaronspring added the question Further information is requested label Oct 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants