Skip to content

Commit

Permalink
Merge pull request #6520 from markotoplak/dask-subarray
Browse files Browse the repository at this point in the history
[ENH] SubarrayComputeValue for faster domain transformation
  • Loading branch information
markotoplak committed Oct 10, 2023
2 parents a82ec37 + d3f9d1a commit 78f27e1
Show file tree
Hide file tree
Showing 8 changed files with 437 additions and 67 deletions.
5 changes: 0 additions & 5 deletions Orange/data/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ def join_partial_results(self, parts):
return dask.array.vstack(parts)
return super().join_partial_results(parts)

def prepare_column(self, col_array):
if self.is_dask:
return col_array.reshape(-1, 1)
return super().prepare_column(self, col_array)

def join_columns(self, data):
if self.is_dask:
return dask.array.hstack(data)
Expand Down
188 changes: 127 additions & 61 deletions Orange/data/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from collections.abc import Iterable, Sequence, Sized
from contextlib import contextmanager
from copy import deepcopy
from enum import Enum
from functools import reduce
from itertools import chain
from numbers import Real, Integral
Expand All @@ -28,7 +29,8 @@
DomainConversion)
from Orange.data.util import SharedComputeValue, \
assure_array_dense, assure_array_sparse, \
assure_column_dense, assure_column_sparse, get_unique_names_duplicates
assure_column_dense, assure_column_sparse, get_unique_names_duplicates, \
SubarrayComputeValue
from Orange.misc.collections import frozendict
from Orange.statistics.util import bincount, countnans, contingency, \
stats as fast_stats, sparse_has_implicit_zeros, sparse_count_implicit_zeros, \
Expand Down Expand Up @@ -218,6 +220,16 @@ def _compute_column(func, *args, **kwargs):
return col


class Conversion(Enum):
X = 0
Y = 1
METAS = 2
SEPARATE = 10
SHARED = 11
SUBARRAY = 12
UNKNOWN = 99


class _ArrayConversion:
def __init__(self, target, src_cols, variables, is_sparse, source_domain):
self.target = target
Expand All @@ -232,6 +244,10 @@ def __init__(self, target, src_cols, variables, is_sparse, source_domain):
self.dtype = dtype
self.row_selection_needed = any(not isinstance(x, Integral)
for x in src_cols)
self.transform_groups = self._create_groups(source_domain)
self.match_density = (
assure_array_sparse if self.is_sparse else assure_array_dense
)

def _can_copy_all(self, src_cols, source_domain):
n_src_attrs = len(source_domain.attributes)
Expand All @@ -244,6 +260,51 @@ def _can_copy_all(self, src_cols, source_domain):
for x in src_cols):
return "Y"

def _create_groups(self, source_domain):
n_src_attrs = len(source_domain.attributes)

groups = []

def add_group(desc, group):
if not group:
return # skip adding empty groups
if desc[0] in {Conversion.X, Conversion.Y, Conversion.METAS, Conversion.SUBARRAY}:
group = _optimize_indices(group, 10e30) # maxlen should not be an issue
groups.append((desc, group))

current_group = []
current_desc = None
for i, col in enumerate(self.src_cols):
if col is None:
desc = (Conversion.UNKNOWN, self.variables[i].Unknown)
elif not isinstance(col, Integral):
if isinstance(col, SubarrayComputeValue):
desc = (Conversion.SUBARRAY, col.compute_shared)
col = col.index
elif isinstance(col, SharedComputeValue):
desc = (Conversion.SHARED, col.compute_shared)
else:
desc = (Conversion.SEPARATE, i) # add index to guarantee non-repetition
elif col < 0:
desc = (Conversion.METAS,)
col = -1 - col
elif col < n_src_attrs:
desc = (Conversion.X,)
else:
desc = (Conversion.Y,)
col = col - n_src_attrs

if current_desc == desc:
current_group.append(col)
else:
add_group(current_desc, current_group)
current_group = [col]
current_desc = desc

add_group(current_desc, current_group)

return groups

def get_subarray(self, source, row_indices):
n_rows = _selection_length(row_indices, len(source))
if not len(self.src_cols):
Expand All @@ -252,16 +313,15 @@ def get_subarray(self, source, row_indices):
else:
return np.zeros((n_rows, 0), dtype=source.X.dtype)

match_density = assure_array_sparse if self.is_sparse else assure_array_dense
n_src_attrs = len(source.domain.attributes)
if self.subarray_from == "X":
arr = match_density(_subarray(source.X, row_indices, self.src_cols))
arr = self.match_density(_subarray(source.X, row_indices, self.src_cols))
elif self.subarray_from == "metas":
arr = match_density(_subarray(source.metas, row_indices,
[-1 - x for x in self.src_cols]))
arr = self.match_density(_subarray(source.metas, row_indices,
[-1 - x for x in self.src_cols]))
elif self.subarray_from == "Y":
Y = source.Y if source.Y.ndim == 2 else source.Y[:, None]
arr = match_density(_subarray(
arr = self.match_density(_subarray(
Y, row_indices,
[x - n_src_attrs for x in self.src_cols]))
else:
Expand All @@ -271,14 +331,7 @@ def get_subarray(self, source, row_indices):
assert arr.ndim == 2 or self.subarray_from == "Y" and arr.ndim == 1
return arr

def get_columns(self, source, row_indices, out=None, target_indices=None):
n_rows = _selection_length(row_indices, len(source))
n_src_attrs = len(source.domain.attributes)

data = []
match_density = (
assure_column_sparse if self.is_sparse else assure_column_dense
)
def prepare_parts(self, source, row_indices, n_rows):

# converting to csc before instead of each column is faster
# do not convert if not required
Expand All @@ -298,64 +351,73 @@ def get_columns(self, source, row_indices, out=None, target_indices=None):
sourceri = source[row_indices]

shared_cache = _thread_local.conversion_cache
for i, col in enumerate(self.src_cols):
if col is None:
col_array = match_density(
np.full((n_rows, 1), self.variables[i].Unknown)
)
elif not isinstance(col, Integral):
if isinstance(col, SharedComputeValue):
shared = _idcache_restore(shared_cache, (col.compute_shared, source))
if shared is None:
shared = col.compute_shared(sourceri)
_idcache_save(shared_cache, (col.compute_shared, source), shared)
col_array = match_density(
_compute_column(col, sourceri, shared_data=shared))
for i, (desc, cols) in enumerate(self.transform_groups):

if desc[0] == Conversion.UNKNOWN:
yield np.full((n_rows, len(cols)), desc[1])

elif desc[0] == Conversion.SHARED:
compute_shared = desc[1]
shared = _idcache_restore(shared_cache, desc[1:] + (source,))
if shared is None:
shared = compute_shared(sourceri)
_idcache_save(shared_cache, desc[1:] + (source,), shared)
t = []
for c in cols:
t.append(self.match_density(
c(sourceri, shared_data=shared).reshape(-1, 1)))
yield self.join_columns(t)

elif desc[0] == Conversion.SUBARRAY:
compute_shared = desc[1]
shared = compute_shared(sourceri, cols)
yield shared

elif desc[0] == Conversion.SEPARATE:
r = cols[0](sourceri)
if not hasattr(r, "shape"):
yield np.broadcast_to(r, (n_rows, 1))
else:
col_array = match_density(_compute_column(col, sourceri))
elif col < 0:
col_array = match_density(
source.metas[row_indices, -1 - col]
)
elif col < n_src_attrs:
col_array = match_density(X[row_indices, col])
yield r.reshape(n_rows, 1)

elif desc[0] == Conversion.METAS:
yield _sa(source.metas, row_indices, cols)

elif desc[0] == Conversion.X:
yield _sa(X, row_indices, cols)

elif desc[0] == Conversion.Y:
yield _sa(Y, row_indices, cols)

else:
col_array = match_density(
Y[row_indices, col - n_src_attrs]
)
raise Exception("Unknown conversion type")

def get_columns(self, source, row_indices, out=None, target_indices=None):
n_rows = _selection_length(row_indices, len(source))

data = []

cpos = 0
for col_array in self.prepare_parts(source, row_indices, n_rows):
col_array = self.match_density(col_array)
rows, cols = col_array.shape

if self.results_inplace:
out[target_indices, i] = col_array
out[target_indices, slice(cpos, cpos+cols)] = col_array
else:
data.append(self.prepare_column(col_array))
data.append(col_array)
cpos += cols

if self.results_inplace:
return out
else:
return self.join_columns(data)

def prepare_column(self, col_array):
return col_array

def join_columns(self, data):
if self.is_sparse:
# creating csr directly would need plenty of manual work which
# would probably slow down the process - conversion coo to csr
# is fast
coo_data = []
coo_col = []
coo_row = []
for i, col_array in enumerate(data):
coo_data.append(col_array.data)
coo_col.append(np.full(len(col_array.data), i))
coo_row.append(col_array.indices) # row indices should be same
n_rows = col_array.shape[0] # pylint: disable=undefined-loop-variable
out = sp.coo_matrix(
(np.hstack(coo_data), (np.hstack(coo_row), np.hstack(coo_col))),
shape=(n_rows, len(self.src_cols)),
dtype=self.dtype
)
return out.tocsr()
return sp.hstack(data)
else:
return np.hstack(data)

def join_partial_results(self, parts):
if self.is_sparse:
Expand Down Expand Up @@ -2455,7 +2517,11 @@ def _subarray(arr, rows, cols):
if arr.ndim == 1:
return arr[rows]
cols = _optimize_indices(cols, arr.shape[1])
if isinstance(rows, slice) or isinstance(cols, slice):
return _sa(arr, rows, cols)


def _sa(arr, rows, cols):
if isinstance(rows, slice) or isinstance(cols, slice) or rows is ... or cols is ...:
return arr[rows, cols]
else:
# rows and columns are independent selectors,
Expand All @@ -2473,7 +2539,7 @@ def _optimize_indices(indices, size):
exception. An IndexError is raised if boolean indices do not conform
to input size.
Allows numpy to reuse the data array, because it defaults to copying
Allows numpy to reuse the data array, because numpy defaults to copying
if given indices.
Parameters
Expand Down
19 changes: 19 additions & 0 deletions Orange/data/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,25 @@ def __hash__(self):
return hash((type(self), self.compute_shared, self.variable))


class SubarrayComputeValue(SharedComputeValue):

def __init__(self, compute_shared, index, variable=None):
super().__init__(compute_shared, variable)
self.index = index

def __call__(self, data, shared_data=None):
"""Fallback."""
shared_data = self.compute_shared(data, [self.index])
return shared_data

def __eq__(self, other):
return super().__eq__(other) \
and self.index == other.index

def __hash__(self):
return hash((super().__hash__(), self.index))


def vstack(arrays):
"""vstack that supports sparse and dense arrays
Expand Down
Loading

0 comments on commit 78f27e1

Please sign in to comment.