Skip to content

Commit db1ad26

Browse files
committed
A cutdown implementation of apply_gufunc
1 parent e2279da commit db1ad26

File tree

2 files changed

+125
-252
lines changed

2 files changed

+125
-252
lines changed

cubed/core/gufunc.py

Lines changed: 43 additions & 252 deletions
Original file line numberDiff line numberDiff line change
@@ -1,228 +1,83 @@
1-
import re
2-
1+
import numpy as np
32
from dask.array.gufunc import _parse_gufunc_signature, _validate_normalize_axes
43
from tlz import concat, merge, unique
54

5+
66
def apply_gufunc(
77
func,
88
signature,
99
*args,
1010
axes=None,
1111
axis=None,
12-
keepdims=False,
1312
output_dtypes=None,
14-
output_sizes=None,
1513
vectorize=None,
16-
allow_rechunk=False,
17-
meta=None,
1814
**kwargs,
1915
):
2016
"""
2117
Apply a generalized ufunc or similar python function to arrays.
2218
23-
``signature`` determines if the function consumes or produces core
24-
dimensions. The remaining dimensions in given input arrays (``*args``)
25-
are considered loop dimensions and are required to broadcast
26-
naturally against each other.
27-
28-
In other terms, this function is like ``np.vectorize``, but for
29-
the blocks of dask arrays. If the function itself shall also
30-
be vectorized use ``vectorize=True`` for convenience.
31-
32-
Parameters
33-
----------
34-
func : callable
35-
Function to call like ``func(*args, **kwargs)`` on input arrays
36-
(``*args``) that returns an array or tuple of arrays. If multiple
37-
arguments with non-matching dimensions are supplied, this function is
38-
expected to vectorize (broadcast) over axes of positional arguments in
39-
the style of NumPy universal functions [1]_ (if this is not the case,
40-
set ``vectorize=True``). If this function returns multiple outputs,
41-
``output_core_dims`` has to be set as well.
42-
signature: string
43-
Specifies what core dimensions are consumed and produced by ``func``.
44-
According to the specification of numpy.gufunc signature [2]_
45-
*args : numeric
46-
Input arrays or scalars to the callable function.
47-
axes: List of tuples, optional, keyword only
48-
A list of tuples with indices of axes a generalized ufunc should operate on.
49-
For instance, for a signature of ``"(i,j),(j,k)->(i,k)"`` appropriate for
50-
matrix multiplication, the base elements are two-dimensional matrices
51-
and these are taken to be stored in the two last axes of each argument. The
52-
corresponding axes keyword would be ``[(-2, -1), (-2, -1), (-2, -1)]``.
53-
For simplicity, for generalized ufuncs that operate on 1-dimensional arrays
54-
(vectors), a single integer is accepted instead of a single-element tuple,
55-
and for generalized ufuncs for which all outputs are scalars, the output
56-
tuples can be omitted.
57-
axis: int, optional, keyword only
58-
A single axis over which a generalized ufunc should operate. This is a short-cut
59-
for ufuncs that operate over a single, shared core dimension, equivalent to passing
60-
in axes with entries of (axis,) for each single-core-dimension argument and ``()`` for
61-
all others. For instance, for a signature ``"(i),(i)->()"``, it is equivalent to passing
62-
in ``axes=[(axis,), (axis,), ()]``.
63-
keepdims: bool, optional, keyword only
64-
If this is set to True, axes which are reduced over will be left in the result as
65-
a dimension with size one, so that the result will broadcast correctly against the
66-
inputs. This option can only be used for generalized ufuncs that operate on inputs
67-
that all have the same number of core dimensions and with outputs that have no core
68-
dimensions , i.e., with signatures like ``"(i),(i)->()"`` or ``"(m,m)->()"``.
69-
If used, the location of the dimensions in the output can be controlled with axes
70-
and axis.
71-
output_dtypes : Optional, dtype or list of dtypes, keyword only
72-
Valid numpy dtype specification or list thereof.
73-
If not given, a call of ``func`` with a small set of data
74-
is performed in order to try to automatically determine the
75-
output dtypes.
76-
output_sizes : dict, optional, keyword only
77-
Optional mapping from dimension names to sizes for outputs. Only used if
78-
new core dimensions (not found on inputs) appear on outputs.
79-
vectorize: bool, keyword only
80-
If set to ``True``, ``np.vectorize`` is applied to ``func`` for
81-
convenience. Defaults to ``False``.
82-
allow_rechunk: Optional, bool, keyword only
83-
Allows rechunking, otherwise chunk sizes need to match and core
84-
dimensions are to consist only of one chunk.
85-
Warning: enabling this can increase memory usage significantly.
86-
Defaults to ``False``.
87-
meta: Optional, tuple, keyword only
88-
tuple of empty ndarrays describing the shape and dtype of the output of the gufunc.
89-
Defaults to ``None``.
90-
**kwargs : dict
91-
Extra keyword arguments to pass to `func`
19+
This is a cutdown version of the
20+
`equivalent function <https://docs.dask.org/en/stable/generated/dask.array.gufunc.apply_gufunc.html>`_
21+
in Dask. Refer there for usage information.
9222
93-
Returns
94-
-------
95-
Single dask.array.Array or tuple of dask.array.Array
23+
Current limitations: ``keepdims``, ``output_sizes``, and ``allow_rechunk`` are not supported;
24+
and multiple outputs are not supported.
9625
97-
Examples
98-
--------
99-
>>> import dask.array as da
100-
>>> import numpy as np
101-
>>> def stats(x):
102-
... return np.mean(x, axis=-1), np.std(x, axis=-1)
103-
>>> a = da.random.normal(size=(10,20,30), chunks=(5, 10, 30))
104-
>>> mean, std = da.apply_gufunc(stats, "(i)->(),()", a)
105-
>>> mean.compute().shape
106-
(10, 20)
26+
Cubed assumes that ``func`` will allocate a new output array. However, if it allocates more memory
27+
than than, then you need to tell Cubed about it by setting the ``extra_required_mem`` parameter
28+
to the amount needed in bytes (per task).
29+
"""
10730

31+
# Currently the following parameters cannot be changed
32+
keepdims = False
33+
output_sizes = None
34+
allow_rechunk = False
10835

109-
>>> def outer_product(x, y):
110-
... return np.einsum("i,j->ij", x, y)
111-
>>> a = da.random.normal(size=( 20,30), chunks=(10, 30))
112-
>>> b = da.random.normal(size=(10, 1,40), chunks=(5, 1, 40))
113-
>>> c = da.apply_gufunc(outer_product, "(i),(j)->(i,j)", a, b, vectorize=True)
114-
>>> c.compute().shape
115-
(10, 20, 30, 40)
36+
# based on dask's apply_gufunc
11637

117-
References
118-
----------
119-
.. [1] https://docs.scipy.org/doc/numpy/reference/ufuncs.html
120-
.. [2] https://docs.scipy.org/doc/numpy/reference/c-api/generalized-ufuncs.html
121-
"""
12238
# Input processing:
123-
## Signature
39+
40+
# Signature
12441
if not isinstance(signature, str):
12542
raise TypeError("`signature` has to be of type string")
126-
# NumPy versions before https://github.com/numpy/numpy/pull/19627
127-
# would not ignore whitespace characters in `signature` like they
128-
# are supposed to. We remove the whitespace here as a workaround.
129-
signature = re.sub(r"\s+", "", signature)
13043
input_coredimss, output_coredimss = _parse_gufunc_signature(signature)
13144

132-
## Determine nout: nout = None for functions of one direct return; nout = int for return tuples
45+
# Determine nout: nout = None for functions of one direct return; nout = int for return tuples
13346
nout = None if not isinstance(output_coredimss, list) else len(output_coredimss)
13447

135-
## Consolidate onto `meta`
136-
if meta is not None and output_dtypes is not None:
137-
raise ValueError(
138-
"Only one of `meta` and `output_dtypes` should be given (`meta` is preferred)."
48+
if nout is not None:
49+
raise NotImplementedError(
50+
"Multiple outputs are not yet supported, see https://github.com/tomwhite/cubed/issues/69"
13951
)
140-
if meta is None:
141-
if output_dtypes is None:
142-
## Infer `output_dtypes`
143-
if vectorize:
144-
tempfunc = np.vectorize(func, signature=signature)
145-
else:
146-
tempfunc = func
147-
output_dtypes = apply_infer_dtype(
148-
tempfunc, args, kwargs, "apply_gufunc", "output_dtypes", nout
149-
)
150-
151-
## Turn `output_dtypes` into `meta`
152-
if (
153-
nout is None
154-
and isinstance(output_dtypes, (tuple, list))
155-
and len(output_dtypes) == 1
156-
):
157-
output_dtypes = output_dtypes[0]
158-
sample = args[0] if args else None
159-
if nout is None:
160-
meta = meta_from_array(sample, dtype=output_dtypes)
161-
else:
162-
meta = tuple(meta_from_array(sample, dtype=odt) for odt in output_dtypes)
16352

164-
## Normalize `meta` format
165-
meta = meta_from_array(meta)
166-
if isinstance(meta, list):
167-
meta = tuple(meta)
168-
169-
## Validate `meta`
170-
if nout is None:
171-
if isinstance(meta, tuple):
172-
if len(meta) == 1:
173-
meta = meta[0]
174-
else:
175-
raise ValueError(
176-
"For a function with one output, must give a single item for `output_dtypes`/`meta`, "
177-
"not a tuple or list."
178-
)
179-
else:
180-
if not isinstance(meta, tuple):
181-
raise ValueError(
182-
f"For a function with {nout} outputs, must give a tuple or list for `output_dtypes`/`meta`, "
183-
"not a single item."
184-
)
185-
if len(meta) != nout:
186-
raise ValueError(
187-
f"For a function with {nout} outputs, must give a tuple or list of {nout} items for "
188-
f"`output_dtypes`/`meta`, not {len(meta)}."
189-
)
190-
191-
## Vectorize function, if required
53+
# Vectorize function, if required
19254
if vectorize:
193-
otypes = [x.dtype for x in meta] if isinstance(meta, tuple) else [meta.dtype]
55+
otypes = output_dtypes
19456
func = np.vectorize(func, signature=signature, otypes=otypes)
19557

196-
## Miscellaneous
58+
# Miscellaneous
19759
if output_sizes is None:
19860
output_sizes = {}
19961

200-
## Axes
62+
# Axes
20163
input_axes, output_axes = _validate_normalize_axes(
20264
axes, axis, keepdims, input_coredimss, output_coredimss
20365
)
20466

20567
# Main code:
206-
## Cast all input arrays to dask
207-
args = [asarray(a) for a in args]
68+
69+
# Cast all input arrays to cubed
70+
# args = [asarray(a) for a in args] # TODO: do we need to support casting?
20871

20972
if len(input_coredimss) != len(args):
21073
raise ValueError(
21174
"According to `signature`, `func` requires %d arguments, but %s given"
21275
% (len(input_coredimss), len(args))
21376
)
21477

215-
## Axes: transpose input arguments
216-
transposed_args = []
217-
for arg, iax in zip(args, input_axes):
218-
shape = arg.shape
219-
iax = tuple(a if a < 0 else a - len(shape) for a in iax)
220-
tidc = tuple(i for i in range(-len(shape) + 0, 0) if i not in iax) + iax
221-
transposed_arg = arg.transpose(tidc)
222-
transposed_args.append(transposed_arg)
223-
args = transposed_args
78+
# Note (cubed): since we don't support allow_rechunk=True, there is no need to transpose args (and outputs back again)
22479

225-
## Assess input args for loop dims
80+
# Assess input args for loop dims
22681
input_shapes = [a.shape for a in args]
22782
input_chunkss = [a.chunks for a in args]
22883
num_loopdims = [len(s) - len(cd) for s, cd in zip(input_shapes, input_coredimss)]
@@ -238,12 +93,12 @@ def apply_gufunc(
23893
tuple("__loopdim%d__" % d for d in range(max_loopdims - n, max_loopdims))
23994
for n in num_loopdims
24095
]
241-
input_dimss = [l + c for l, c in zip(loop_input_dimss, input_coredimss)]
96+
input_dimss = [lp + c for lp, c in zip(loop_input_dimss, input_coredimss)]
24297

24398
loop_output_dims = max(loop_input_dimss, key=len) if loop_input_dimss else tuple()
24499

245-
## Assess input args for same size and chunk sizes
246-
### Collect sizes and chunksizes of all dims in all arrays
100+
# Assess input args for same size and chunk sizes
101+
# Collect sizes and chunksizes of all dims in all arrays
247102
dimsizess = {}
248103
chunksizess = {}
249104
for dims, shape, chunksizes in zip(input_dimss, input_shapes, input_chunkss):
@@ -254,14 +109,14 @@ def apply_gufunc(
254109
chunksizes_ = chunksizess.get(dim, [])
255110
chunksizes_.append(chunksize)
256111
chunksizess[dim] = chunksizes_
257-
### Assert correct partitioning, for case:
112+
# Assert correct partitioning, for case:
258113
for dim, sizes in dimsizess.items():
259-
#### Check that the arrays have same length for same dimensions or dimension `1`
114+
# Check that the arrays have same length for same dimensions or dimension `1`
260115
if set(sizes) | {1} != {1, max(sizes)}:
261116
raise ValueError(f"Dimension `'{dim}'` with different lengths in arrays")
262117
if not allow_rechunk:
263118
chunksizes = chunksizess[dim]
264-
#### Check if core dimensions consist of only one chunk
119+
# Check if core dimensions consist of only one chunk
265120
if (dim in core_shapes) and (chunksizes[0][0] < core_shapes[dim]):
266121
raise ValueError(
267122
"Core dimension `'{}'` consists of multiple chunks. To fix, rechunk into a single \
@@ -270,7 +125,7 @@ def apply_gufunc(
270125
dim
271126
)
272127
)
273-
#### Check if loop dimensions consist of same chunksizes, when they have sizes > 1
128+
# Check if loop dimensions consist of same chunksizes, when they have sizes > 1
274129
relevant_chunksizes = list(
275130
unique(c for s, c in zip(sizes, chunksizes) if s > 1)
276131
)
@@ -279,76 +134,12 @@ def apply_gufunc(
279134
f"Dimension `'{dim}'` with different chunksize present"
280135
)
281136

282-
## Apply function - use blockwise here
137+
# Apply function - use blockwise here
283138
arginds = list(concat(zip(args, input_dimss)))
284139

285-
### Use existing `blockwise` but only with loopdims to enforce
286-
### concatenation for coredims that appear also at the output
287-
### Modifying `blockwise` could improve things here.
288-
tmp = blockwise(
289-
func, loop_output_dims, *arginds, concatenate=True, meta=meta, **kwargs
290-
)
291-
292-
# NOTE: we likely could just use `meta` instead of `tmp._meta`,
293-
# but we use it and validate it anyway just to be sure nothing odd has happened.
294-
metas = tmp._meta
295-
if nout is None:
296-
assert not isinstance(
297-
metas, (list, tuple)
298-
), f"meta changed from single output to multiple output during blockwise: {meta} -> {metas}"
299-
metas = (metas,)
300-
else:
301-
assert isinstance(
302-
metas, (list, tuple)
303-
), f"meta changed from multiple output to single output during blockwise: {meta} -> {metas}"
304-
assert (
305-
len(metas) == nout
306-
), f"Number of outputs changed from {nout} to {len(metas)} during blockwise"
307-
308-
## Prepare output shapes
309-
loop_output_shape = tmp.shape
310-
loop_output_chunks = tmp.chunks
311-
keys = list(flatten(tmp.__dask_keys__()))
312-
name, token = keys[0][0].split("-")
313-
314-
### *) Treat direct output
315-
if nout is None:
316-
output_coredimss = [output_coredimss]
317-
318-
## Split output
319-
leaf_arrs = []
320-
for i, (ocd, oax, meta) in enumerate(zip(output_coredimss, output_axes, metas)):
321-
core_output_shape = tuple(core_shapes[d] for d in ocd)
322-
core_chunkinds = len(ocd) * (0,)
323-
output_shape = loop_output_shape + core_output_shape
324-
output_chunks = loop_output_chunks + core_output_shape
325-
leaf_name = "%s_%d-%s" % (name, i, token)
326-
leaf_dsk = {
327-
(leaf_name,)
328-
+ key[1:]
329-
+ core_chunkinds: ((getitem, key, i) if nout else key)
330-
for key in keys
331-
}
332-
graph = HighLevelGraph.from_collections(leaf_name, leaf_dsk, dependencies=[tmp])
333-
meta = meta_from_array(meta, len(output_shape))
334-
leaf_arr = Array(
335-
graph, leaf_name, chunks=output_chunks, shape=output_shape, meta=meta
336-
)
337-
338-
### Axes:
339-
if keepdims:
340-
slices = len(leaf_arr.shape) * (slice(None),) + len(oax) * (np.newaxis,)
341-
leaf_arr = leaf_arr[slices]
140+
from cubed.core.ops import blockwise
342141

343-
tidcs = [None] * len(leaf_arr.shape)
344-
for ii, oa in zip(range(-len(oax), 0), oax):
345-
tidcs[oa] = ii
346-
j = 0
347-
for ii in range(len(tidcs)):
348-
if tidcs[ii] is None:
349-
tidcs[ii] = j
350-
j += 1
351-
leaf_arr = leaf_arr.transpose(tidcs)
352-
leaf_arrs.append(leaf_arr)
142+
# Note (cubed): use blockwise on all output dimensions, not just loop_output_dims like in original
143+
out_ind = loop_output_dims + output_coredimss
353144

354-
return (*leaf_arrs,) if nout else leaf_arrs[0] # Undo *) from above
145+
return blockwise(func, out_ind, *arginds, dtype=output_dtypes, **kwargs)

0 commit comments

Comments
 (0)