From a806c7a445871e1722fdc2445e002e434229edfc Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Wed, 26 Oct 2022 13:33:12 +0200 Subject: [PATCH 1/9] Benchmark data transform for dask tables --- benchmark/bench_transform.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/benchmark/bench_transform.py b/benchmark/bench_transform.py index 4ca481a9801..d6f1733f8f8 100644 --- a/benchmark/bench_transform.py +++ b/benchmark/bench_transform.py @@ -2,8 +2,10 @@ import numpy as np import scipy.sparse +import dask.array from Orange.data import Table, ContinuousVariable, Domain +from Orange.tests.test_dasktable import temp_dasktable from Orange.tests.test_table import preprocess_domain_single, preprocess_domain_shared from .base import Benchmark, benchmark @@ -32,6 +34,12 @@ def setup_sparse(self, rows, cols): Domain([ContinuousVariable(str(i), sparse=True) for i in range(cols)]), sparse) + def setup_dask(self, rows, cols): + table = Table.from_numpy( # pylint: disable=W0201 + Domain([ContinuousVariable(str(i)) for i in range(cols)]), + np.random.RandomState(0).rand(rows, cols)) + self.table = temp_dasktable(table) + @benchmark(setup=partial(setup_dense, rows=10000, cols=100), number=5) def bench_copy_dense_long(self): add_unknown_attribute(self.table) @@ -44,6 +52,21 @@ def bench_copy_dense_square(self): def bench_copy_dense_wide(self): add_unknown_attribute(self.table) + @benchmark(setup=partial(setup_dask, rows=10000, cols=100), number=5) + def bench_copy_dask_long(self): + t = add_unknown_attribute(self.table) + self.assertIsInstance(t.X, dask.array.Array) + + @benchmark(setup=partial(setup_dask, rows=1000, cols=1000), number=5) + def bench_copy_dask_square(self): + t = add_unknown_attribute(self.table) + self.assertIsInstance(t.X, dask.array.Array) + + @benchmark(setup=partial(setup_dask, rows=100, cols=10000), number=2) + def bench_copy_dask_wide(self): + t = add_unknown_attribute(self.table) + self.assertIsInstance(t.X, dask.array.Array) + @benchmark(setup=partial(setup_sparse, rows=10000, cols=100), number=5) def bench_copy_sparse_long(self): t = add_unknown_attribute(self.table) @@ -63,6 +86,13 @@ def bench_copy_sparse_wide(self): def bench_subarray_dense_long(self): # adding a class should link X add_unknown_class(self.table) + self.assertIs(self.table.X, t.X) + + @benchmark(setup=partial(setup_dask, rows=10000, cols=100), number=5) + def bench_subarray_dense_long(self): + # adding a class should link X + t = add_unknown_class(self.table) + self.assertIs(self.table.X, t.X) def setup_dense_transforms(self, rows, cols, transforms): self.setup_dense(rows, cols) From 2b4efde3b912256521d7dbb84c2589ee8844d13b Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Wed, 26 Oct 2022 13:53:24 +0200 Subject: [PATCH 2/9] Benchmark normalization for numpy and dask tables --- benchmark/bench_normalize.py | 40 ++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 benchmark/bench_normalize.py diff --git a/benchmark/bench_normalize.py b/benchmark/bench_normalize.py new file mode 100644 index 00000000000..d78677822c7 --- /dev/null +++ b/benchmark/bench_normalize.py @@ -0,0 +1,40 @@ +import numpy as np + +from Orange.data import Table, ContinuousVariable, Domain +from Orange.preprocess import Normalize +from Orange.tests.test_dasktable import temp_dasktable + +from .base import Benchmark, benchmark + + +class BenchNormalize(Benchmark): + + def setUp(self): + rows = 10000 + cols = 1000 + self.table = Table.from_numpy( # pylint: disable=W0201 + Domain([ContinuousVariable(str(i)) for i in range(cols)]), + np.random.RandomState(0).rand(rows, cols)) + self.dasktable = temp_dasktable(self.table) + self.normalized_domain = Normalize()(self.table).domain + self.normalized_dasktable = self.dasktable.transform(self.normalized_domain) + + @benchmark(number=5) + def bench_normalize_dense(self): + Normalize()(self.table) + + @benchmark(number=5) + def bench_normalize_dask(self): + Normalize()(self.dasktable) + + @benchmark(number=5) + def bench_transform_dense(self): + self.table.transform(self.normalized_domain) + + @benchmark(number=5) + def bench_transform_dask(self): + self.dasktable.transform(self.normalized_domain) + + @benchmark(number=5) + def bench_transform_dask_values(self): + self.normalized_dasktable.X.compute() From fc05d700c883ad721a60a1efc38fda4498ae28c1 Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Wed, 5 Jul 2023 10:49:44 +0200 Subject: [PATCH 3/9] Benchmark SklImpute and some combinations --- benchmark/bench_dask_preprocess.py | 59 ++++++++++++++++++++++++++++++ benchmark/bench_normalize.py | 40 -------------------- 2 files changed, 59 insertions(+), 40 deletions(-) create mode 100644 benchmark/bench_dask_preprocess.py delete mode 100644 benchmark/bench_normalize.py diff --git a/benchmark/bench_dask_preprocess.py b/benchmark/bench_dask_preprocess.py new file mode 100644 index 00000000000..bdbec032b41 --- /dev/null +++ b/benchmark/bench_dask_preprocess.py @@ -0,0 +1,59 @@ +import numpy as np + +from Orange.data import Table, ContinuousVariable, Domain +from Orange.preprocess import Normalize, SklImpute, PreprocessorList +from Orange.tests.test_dasktable import temp_dasktable + +from .base import Benchmark, benchmark + + +class BenchNormalize(Benchmark): + + preprocessor = Normalize() + + @classmethod + def create_data(cls): + rows = 10000 + cols = 1000 + return Table.from_numpy( # pylint: disable=W0201 + Domain([ContinuousVariable(str(i)) for i in range(cols)]), + np.random.RandomState(0).rand(rows, cols)) + + @classmethod + def setUpClass(cls): + cls.table = cls.create_data() + cls.dasktable = temp_dasktable(cls.table) + cls.preprocessed_domain = cls.preprocessor(cls.table).domain + cls.preprocessed_dasktable = cls.dasktable.transform(cls.preprocessed_domain) + + @benchmark(number=3, warmup=1) + def bench_run_dense(self): + self.preprocessor(self.table) + + @benchmark(number=3, warmup=1) + def bench_run_dask(self): + self.preprocessor(self.dasktable) + + @benchmark(number=3, warmup=1) + def bench_transform_dense(self): + self.table.transform(self.preprocessed_domain) + + @benchmark(number=3, warmup=1) + def bench_transform_dask(self): + self.dasktable.transform(self.preprocessed_domain) + + @benchmark(number=3, warmup=1) + def bench_transform_dask_values(self): + self.preprocessed_dasktable.X.compute() + + +class BenchSkImpute(BenchNormalize): + preprocessor = SklImpute() + + +class BenchNormalizeImpute(BenchNormalize): + preprocessor = PreprocessorList([Normalize(), SklImpute()]) + + +class BenchImputeNormalize(BenchNormalize): + preprocessor = PreprocessorList([SklImpute(), Normalize()]) diff --git a/benchmark/bench_normalize.py b/benchmark/bench_normalize.py deleted file mode 100644 index d78677822c7..00000000000 --- a/benchmark/bench_normalize.py +++ /dev/null @@ -1,40 +0,0 @@ -import numpy as np - -from Orange.data import Table, ContinuousVariable, Domain -from Orange.preprocess import Normalize -from Orange.tests.test_dasktable import temp_dasktable - -from .base import Benchmark, benchmark - - -class BenchNormalize(Benchmark): - - def setUp(self): - rows = 10000 - cols = 1000 - self.table = Table.from_numpy( # pylint: disable=W0201 - Domain([ContinuousVariable(str(i)) for i in range(cols)]), - np.random.RandomState(0).rand(rows, cols)) - self.dasktable = temp_dasktable(self.table) - self.normalized_domain = Normalize()(self.table).domain - self.normalized_dasktable = self.dasktable.transform(self.normalized_domain) - - @benchmark(number=5) - def bench_normalize_dense(self): - Normalize()(self.table) - - @benchmark(number=5) - def bench_normalize_dask(self): - Normalize()(self.dasktable) - - @benchmark(number=5) - def bench_transform_dense(self): - self.table.transform(self.normalized_domain) - - @benchmark(number=5) - def bench_transform_dask(self): - self.dasktable.transform(self.normalized_domain) - - @benchmark(number=5) - def bench_transform_dask_values(self): - self.normalized_dasktable.X.compute() From 66a20f9b0add3c9aad3791806de01963fb509704 Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Wed, 26 Oct 2022 14:57:01 +0200 Subject: [PATCH 4/9] Transforming by subarrays that passes tests --- Orange/data/dask.py | 5 -- Orange/data/table.py | 188 +++++++++++++++++++++++++++++-------------- Orange/data/util.py | 12 +++ 3 files changed, 139 insertions(+), 66 deletions(-) diff --git a/Orange/data/dask.py b/Orange/data/dask.py index 6bb0f077fd1..a0c9e8aaef6 100644 --- a/Orange/data/dask.py +++ b/Orange/data/dask.py @@ -35,11 +35,6 @@ def join_partial_results(self, parts): return dask.array.vstack(parts) return super().join_partial_results(parts) - def prepare_column(self, col_array): - if self.is_dask: - return col_array.reshape(-1, 1) - return super().prepare_column(self, col_array) - def join_columns(self, data): if self.is_dask: return dask.array.hstack(data) diff --git a/Orange/data/table.py b/Orange/data/table.py index 82bf39fa9b2..7fd975a8b37 100644 --- a/Orange/data/table.py +++ b/Orange/data/table.py @@ -8,6 +8,7 @@ from collections.abc import Iterable, Sequence, Sized from contextlib import contextmanager from copy import deepcopy +from enum import Enum from functools import reduce from itertools import chain from numbers import Real, Integral @@ -28,7 +29,8 @@ DomainConversion) from Orange.data.util import SharedComputeValue, \ assure_array_dense, assure_array_sparse, \ - assure_column_dense, assure_column_sparse, get_unique_names_duplicates + assure_column_dense, assure_column_sparse, get_unique_names_duplicates, \ + SubarrayComputeValue from Orange.misc.collections import frozendict from Orange.statistics.util import bincount, countnans, contingency, \ stats as fast_stats, sparse_has_implicit_zeros, sparse_count_implicit_zeros, \ @@ -218,6 +220,16 @@ def _compute_column(func, *args, **kwargs): return col +class Conversion(Enum): + X = 0 + Y = 1 + METAS = 2 + SEPARATE = 10 + SHARED = 11 + SUBARRAY = 12 + UNKNOWN = 99 + + class _ArrayConversion: def __init__(self, target, src_cols, variables, is_sparse, source_domain): self.target = target @@ -232,6 +244,10 @@ def __init__(self, target, src_cols, variables, is_sparse, source_domain): self.dtype = dtype self.row_selection_needed = any(not isinstance(x, Integral) for x in src_cols) + self.transform_groups = self._create_groups(source_domain) + self.match_density = ( + assure_array_sparse if self.is_sparse else assure_array_dense + ) def _can_copy_all(self, src_cols, source_domain): n_src_attrs = len(source_domain.attributes) @@ -244,6 +260,51 @@ def _can_copy_all(self, src_cols, source_domain): for x in src_cols): return "Y" + def _create_groups(self, source_domain): + n_src_attrs = len(source_domain.attributes) + + groups = [] + + def add_group(desc, group): + if not group: + return # skip adding empty groups + if desc[0] in {Conversion.X, Conversion.Y, Conversion.METAS, Conversion.SUBARRAY}: + group = _optimize_indices(group, 10e30) # maxlen should not be an issue + groups.append((desc, group)) + + current_group = [] + current_desc = None + for i, col in enumerate(self.src_cols): + if col is None: + desc = (Conversion.UNKNOWN, self.variables[i].Unknown) + elif not isinstance(col, Integral): + if isinstance(col, SubarrayComputeValue): + desc = (Conversion.SUBARRAY, col.compute_shared) + col = col.index + elif isinstance(col, SharedComputeValue): + desc = (Conversion.SHARED, col.compute_shared) + else: + desc = (Conversion.SEPARATE, i) # add index to guarantee non-repetition + elif col < 0: + desc = (Conversion.METAS,) + col = -1 - col + elif col < n_src_attrs: + desc = (Conversion.X,) + else: + desc = (Conversion.Y,) + col = col - n_src_attrs + + if current_desc == desc: + current_group.append(col) + else: + add_group(current_desc, current_group) + current_group = [col] + current_desc = desc + + add_group(current_desc, current_group) + + return groups + def get_subarray(self, source, row_indices): n_rows = _selection_length(row_indices, len(source)) if not len(self.src_cols): @@ -252,16 +313,15 @@ def get_subarray(self, source, row_indices): else: return np.zeros((n_rows, 0), dtype=source.X.dtype) - match_density = assure_array_sparse if self.is_sparse else assure_array_dense n_src_attrs = len(source.domain.attributes) if self.subarray_from == "X": - arr = match_density(_subarray(source.X, row_indices, self.src_cols)) + arr = self.match_density(_subarray(source.X, row_indices, self.src_cols)) elif self.subarray_from == "metas": - arr = match_density(_subarray(source.metas, row_indices, - [-1 - x for x in self.src_cols])) + arr = self.match_density(_subarray(source.metas, row_indices, + [-1 - x for x in self.src_cols])) elif self.subarray_from == "Y": Y = source.Y if source.Y.ndim == 2 else source.Y[:, None] - arr = match_density(_subarray( + arr = self.match_density(_subarray( Y, row_indices, [x - n_src_attrs for x in self.src_cols])) else: @@ -271,14 +331,7 @@ def get_subarray(self, source, row_indices): assert arr.ndim == 2 or self.subarray_from == "Y" and arr.ndim == 1 return arr - def get_columns(self, source, row_indices, out=None, target_indices=None): - n_rows = _selection_length(row_indices, len(source)) - n_src_attrs = len(source.domain.attributes) - - data = [] - match_density = ( - assure_column_sparse if self.is_sparse else assure_column_dense - ) + def prepare_parts(self, source, row_indices, n_rows): # converting to csc before instead of each column is faster # do not convert if not required @@ -298,64 +351,73 @@ def get_columns(self, source, row_indices, out=None, target_indices=None): sourceri = source[row_indices] shared_cache = _thread_local.conversion_cache - for i, col in enumerate(self.src_cols): - if col is None: - col_array = match_density( - np.full((n_rows, 1), self.variables[i].Unknown) - ) - elif not isinstance(col, Integral): - if isinstance(col, SharedComputeValue): - shared = _idcache_restore(shared_cache, (col.compute_shared, source)) - if shared is None: - shared = col.compute_shared(sourceri) - _idcache_save(shared_cache, (col.compute_shared, source), shared) - col_array = match_density( - _compute_column(col, sourceri, shared_data=shared)) + for i, (desc, cols) in enumerate(self.transform_groups): + + if desc[0] == Conversion.UNKNOWN: + yield np.full((n_rows, len(cols)), desc[1]) + + elif desc[0] == Conversion.SHARED: + compute_shared = desc[1] + shared = _idcache_restore(shared_cache, desc[1:] + (source,)) + if shared is None: + shared = compute_shared(sourceri) + _idcache_save(shared_cache, desc[1:] + (source,), shared) + t = [] + for c in cols: + t.append(self.match_density( + c(sourceri, shared_data=shared).reshape(-1, 1))) + yield self.join_columns(t) + + elif desc[0] == Conversion.SUBARRAY: + compute_shared = desc[1] + shared = compute_shared(sourceri, cols) + yield shared + + elif desc[0] == Conversion.SEPARATE: + r = cols[0](sourceri) + if not hasattr(r, "shape"): + yield np.broadcast_to(r, (n_rows, 1)) else: - col_array = match_density(_compute_column(col, sourceri)) - elif col < 0: - col_array = match_density( - source.metas[row_indices, -1 - col] - ) - elif col < n_src_attrs: - col_array = match_density(X[row_indices, col]) + yield r.reshape(n_rows, 1) + + elif desc[0] == Conversion.METAS: + yield _sa(source.metas, row_indices, cols) + + elif desc[0] == Conversion.X: + yield _sa(X, row_indices, cols) + + elif desc[0] == Conversion.Y: + yield _sa(Y, row_indices, cols) + else: - col_array = match_density( - Y[row_indices, col - n_src_attrs] - ) + raise Exception("Unknown conversion type") + + def get_columns(self, source, row_indices, out=None, target_indices=None): + n_rows = _selection_length(row_indices, len(source)) + + data = [] + + cpos = 0 + for col_array in self.prepare_parts(source, row_indices, n_rows): + col_array = self.match_density(col_array) + rows, cols = col_array.shape if self.results_inplace: - out[target_indices, i] = col_array + out[target_indices, slice(cpos, cpos+cols)] = col_array else: - data.append(self.prepare_column(col_array)) + data.append(col_array) + cpos += cols if self.results_inplace: return out else: return self.join_columns(data) - def prepare_column(self, col_array): - return col_array - def join_columns(self, data): if self.is_sparse: - # creating csr directly would need plenty of manual work which - # would probably slow down the process - conversion coo to csr - # is fast - coo_data = [] - coo_col = [] - coo_row = [] - for i, col_array in enumerate(data): - coo_data.append(col_array.data) - coo_col.append(np.full(len(col_array.data), i)) - coo_row.append(col_array.indices) # row indices should be same - n_rows = col_array.shape[0] # pylint: disable=undefined-loop-variable - out = sp.coo_matrix( - (np.hstack(coo_data), (np.hstack(coo_row), np.hstack(coo_col))), - shape=(n_rows, len(self.src_cols)), - dtype=self.dtype - ) - return out.tocsr() + return sp.hstack(data) + else: + return np.hstack(data) def join_partial_results(self, parts): if self.is_sparse: @@ -2451,7 +2513,11 @@ def _subarray(arr, rows, cols): if arr.ndim == 1: return arr[rows] cols = _optimize_indices(cols, arr.shape[1]) - if isinstance(rows, slice) or isinstance(cols, slice): + return _sa(arr, rows, cols) + + +def _sa(arr, rows, cols): + if isinstance(rows, slice) or isinstance(cols, slice) or rows is ... or cols is ...: return arr[rows, cols] else: # rows and columns are independent selectors, @@ -2469,7 +2535,7 @@ def _optimize_indices(indices, size): exception. An IndexError is raised if boolean indices do not conform to input size. - Allows numpy to reuse the data array, because it defaults to copying + Allows numpy to reuse the data array, because numpy defaults to copying if given indices. Parameters diff --git a/Orange/data/util.py b/Orange/data/util.py index 245895efcc3..f13dc2dde3e 100644 --- a/Orange/data/util.py +++ b/Orange/data/util.py @@ -103,6 +103,18 @@ def __hash__(self): return hash((type(self), self.compute_shared, self.variable)) +class SubarrayComputeValue: + + def __init__(self, compute_shared, index): + self.compute_shared = compute_shared + self.index = index + + def __call__(self, data, shared_data=None): + """Fallback if common parts are not passed.""" + shared_data = self.compute_shared(data, [self.index]) + return shared_data + + def vstack(arrays): """vstack that supports sparse and dense arrays From a3ab81a32a3e9ad2554db6452571917bcf8a2cfa Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Fri, 14 Jul 2023 11:00:42 +0200 Subject: [PATCH 5/9] SubarrayComputeValue: __eq__ and __hash__ --- Orange/data/util.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/Orange/data/util.py b/Orange/data/util.py index f13dc2dde3e..fbae316c74d 100644 --- a/Orange/data/util.py +++ b/Orange/data/util.py @@ -103,17 +103,24 @@ def __hash__(self): return hash((type(self), self.compute_shared, self.variable)) -class SubarrayComputeValue: +class SubarrayComputeValue(SharedComputeValue): - def __init__(self, compute_shared, index): - self.compute_shared = compute_shared + def __init__(self, compute_shared, index, variable=None): + super().__init__(compute_shared, variable) self.index = index def __call__(self, data, shared_data=None): - """Fallback if common parts are not passed.""" + """Fallback.""" shared_data = self.compute_shared(data, [self.index]) return shared_data + def __eq__(self, other): + return super().__eq__(other) \ + and self.index == other.index + + def __hash__(self): + return hash((super().__hash__(), self.index)) + def vstack(arrays): """vstack that supports sparse and dense arrays From 4570d08f912d6e0b27369aca0d90665730d1d2d5 Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Wed, 26 Oct 2022 14:56:30 +0200 Subject: [PATCH 6/9] Normalization through subarrays --- Orange/preprocess/normalize.py | 49 ++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/Orange/preprocess/normalize.py b/Orange/preprocess/normalize.py index 8e9206f697f..5847c45354e 100644 --- a/Orange/preprocess/normalize.py +++ b/Orange/preprocess/normalize.py @@ -1,6 +1,8 @@ import numpy as np +import scipy.sparse as sp from Orange.data import Domain, ContinuousVariable +from Orange.data.util import SubarrayComputeValue from Orange.statistics import basic_stats from Orange.util import Reprable from .preprocess import Normalize @@ -8,6 +10,52 @@ __all__ = ["Normalizer"] +class SubarrayNorms: + + def __init__(self, source_vars, offsets, factors): + self.source_vars = tuple(source_vars) + self.offsets = np.array(offsets) + self.factors = np.array(factors) + + def __call__(self, data, cols): + X = data.transform(Domain(self.source_vars[cols])).X + offsets = self.offsets[cols] + factors = self.factors[cols] + + if sp.issparse(X): + if np.any(offsets != 0): + raise ValueError('Normalization does not work for sparse data.') + return X.multiply(factors.reshape(1, -1)) # the "-" operation return dense + else: + return (X-offsets.reshape(1, -1)) * (factors.reshape(1, -1)) + + +def compress_norm_to_subarray(domain): + source_vars = [] + offsets = [] + factors = [] + + for a in domain.attributes: + if isinstance(a.compute_value, Norm): + tr = a.compute_value + source_vars.append(tr.variable) + offsets.append(tr.offset) + factors.append(tr.factor) + + st = SubarrayNorms(source_vars, offsets, factors) + + new_atts = [] + ind = 0 + for a in domain.attributes: + if isinstance(a.compute_value, Norm): + cv = SubarrayComputeValue(st, ind, a.compute_value.variable) + a = a.copy(compute_value=cv) + ind += 1 + new_atts.append(a) + + return Domain(new_atts, domain.class_vars, domain.metas) + + class Normalizer(Reprable): def __init__(self, zero_based=True, @@ -33,6 +81,7 @@ def __call__(self, data): (i, var) in enumerate(data.domain.class_vars)] domain = Domain(new_attrs, new_class_vars, data.domain.metas) + domain = compress_norm_to_subarray(domain) return data.transform(domain) def normalize(self, stats, var): From f6f9bdffa935a4f817e1124e2fa42fad8365d3fc Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Tue, 11 Jul 2023 14:07:19 +0200 Subject: [PATCH 7/9] SklImpute as SubarrayComputeValue WITH BRANCH [run_dask] with 3 loops, best of 3: min 910 msec per loop avg 943 msec per loop [run_dense] with 3 loops, best of 3: min 858 msec per loop avg 888 msec per loop [transform_dask] with 3 loops, best of 3: min 44.4 msec per loop avg 44.9 msec per loop [transform_dask_values] with 3 loops, best of 3: min 258 msec per loop avg 407 msec per loop [transform_dense] with 3 loops, best of 3: min 600 msec per loop avg 629 msec per loop [run_dask] with 3 loops, best of 3: min 481 msec per loop avg 504 msec per loop [run_dense] with 3 loops, best of 3: min 669 msec per loop avg 695 msec per loop [transform_dask] with 3 loops, best of 3: min 31.7 msec per loop avg 31.8 msec per loop [transform_dask_values] with 3 loops, best of 3: min 327 msec per loop avg 349 msec per loop [transform_dense] with 3 loops, best of 3: min 342 msec per loop avg 365 msec per loop [run_dask] with 3 loops, best of 3: min 1.08 sec per loop avg 1.29 sec per loop [run_dense] with 3 loops, best of 3: min 1.31 sec per loop avg 1.34 sec per loop [transform_dask] with 3 loops, best of 3: min 45.6 msec per loop avg 46 msec per loop [transform_dask_values] with 3 loops, best of 3: min 430 msec per loop avg 589 msec per loop [transform_dense] with 3 loops, best of 3: min 583 msec per loop avg 639 msec per loop [run_dask] with 3 loops, best of 3: min 203 msec per loop avg 235 msec per loop [run_dense] with 3 loops, best of 3: min 476 msec per loop avg 529 msec per loop [transform_dask] with 3 loops, best of 3: min 30.4 msec per loop avg 31 msec per loop [transform_dask_values] with 3 loops, best of 3: min 85.1 msec per loop avg 174 msec per loop [transform_dense] with 3 loops, best of 3: min 262 msec per loop avg 271 msec per loop [normalize_only_parameters] with 5 loops, best of 3: min 53.4 msec per loop avg 54.7 msec per loop [normalize_only_transform] with 5 loops, best of 3: min 35.7 msec per loop avg 35.9 msec per loop [sklimpute] with 5 loops, best of 3: min 65.4 msec per loop avg 66.3 msec per loop BEFORE [run_dask] with 3 loops, best of 3: min 17 sec per loop avg 18 sec per loop [run_dense] with 3 loops, best of 3: min 1.76 sec per loop avg 1.83 sec per loop [transform_dask] with 3 loops, best of 3: min 3.67 sec per loop avg 3.72 sec per loop [transform_dask_values] with 3 loops, best of 3: min 1.55 sec per loop avg 1.57 sec per loop [transform_dense] with 3 loops, best of 3: min 1.98 sec per loop avg 1.99 sec per loop [run_dask] with 3 loops, best of 3: min 2.6 sec per loop avg 2.66 sec per loop [run_dense] with 3 loops, best of 3: min 1.08 sec per loop avg 1.08 sec per loop [transform_dask] with 3 loops, best of 3: min 2.08 sec per loop avg 2.08 sec per loop [transform_dask_values] with 3 loops, best of 3: min 1.02 sec per loop avg 1.04 sec per loop [transform_dense] with 3 loops, best of 3: min 763 msec per loop avg 765 msec per loop [run_dask] with 3 loops, best of 3: min 14.1 sec per loop avg 14.4 sec per loop [run_dense] with 3 loops, best of 3: min 1.95 sec per loop avg 1.98 sec per loop [transform_dask] with 3 loops, best of 3: min 3.74 sec per loop avg 3.76 sec per loop [transform_dask_values] with 3 loops, best of 3: min 1.51 sec per loop avg 1.6 sec per loop [transform_dense] with 3 loops, best of 3: min 1.91 sec per loop avg 1.93 sec per loop [run_dask] with 3 loops, best of 3: min 1.74 sec per loop avg 1.85 sec per loop [run_dense] with 3 loops, best of 3: min 1.01 sec per loop avg 1.02 sec per loop [transform_dask] with 3 loops, best of 3: min 1.6 sec per loop avg 1.63 sec per loop [transform_dask_values] with 3 loops, best of 3: min 1 sec per loop avg 1.02 sec per loop [transform_dense] with 3 loops, best of 3: min 846 msec per loop avg 865 msec per loop [normalize_only_parameters] with 5 loops, best of 3: min 55.5 msec per loop avg 55.8 msec per loop [normalize_only_transform] with 5 loops, best of 3: min 118 msec per loop avg 119 msec per loop [sklimpute] with 5 loops, best of 3: min 154 msec per loop avg 157 msec per loop --- Orange/preprocess/preprocess.py | 50 +++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/Orange/preprocess/preprocess.py b/Orange/preprocess/preprocess.py index d77598c3a6b..a6195ee2d44 100644 --- a/Orange/preprocess/preprocess.py +++ b/Orange/preprocess/preprocess.py @@ -21,6 +21,8 @@ "ProjectPCA", "ProjectCUR", "Scale", "RemoveSparse", "AdaptiveNormalize", "PreprocessorList"] +from ..data.util import SubarrayComputeValue + class Preprocess(Reprable): """ @@ -148,6 +150,53 @@ def __call__(self, data): return data.transform(domain) +class SubarrayImpute: + + def __init__(self, source_vars, vals): + self.source_vars = tuple(source_vars) + self.vals = np.array(vals) + + def __call__(self, data, cols): + X = data.transform(Orange.data.Domain(self.source_vars[cols])).X + vals = self.vals[cols] + + if sp.issparse(X): + X = sp.csc_matrix(X, copy=True) + # this is what scikit-learn does for sparse data + nans = np.isnan(X.data) + col_indexes = np.repeat( + np.arange(len(X.indptr) - 1, dtype=int), np.diff(X.indptr) + ) + X.data[nans] = vals[col_indexes[nans]] + return X + else: + return np.where(np.isnan(X), vals.reshape(1, -1), X) + + +def compress_replace_unknowns_to_subarray(domain): + source_vars = [] + vals = [] + + for a in domain.attributes: + if isinstance(a.compute_value, impute.ReplaceUnknowns): + tr = a.compute_value + source_vars.append(tr.variable) + vals.append(tr.value) + + st = SubarrayImpute(source_vars, vals) + + new_atts = [] + ind = 0 + for a in domain.attributes: + if isinstance(a.compute_value, impute.ReplaceUnknowns): + cv = SubarrayComputeValue(st, ind, a.compute_value.variable) + a = a.copy(compute_value=cv) + ind += 1 + new_atts.append(a) + + return Orange.data.Domain(new_atts, domain.class_vars, domain.metas) + + class SklImpute(Preprocess): __wraps__ = SimpleImputer @@ -176,6 +225,7 @@ def __call__(self, data): if not np.isnan(value)] domain = Orange.data.Domain(features, data.domain.class_vars, data.domain.metas) + domain = compress_replace_unknowns_to_subarray(domain) new_data = data.transform(domain) return new_data From e1115b860d51093b548066c0fb7881abb15e4ab6 Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Fri, 14 Jul 2023 15:16:53 +0200 Subject: [PATCH 8/9] Test SubarrayComputeValue: ensure it is not called too often --- Orange/tests/test_data_util.py | 59 +++++++++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/Orange/tests/test_data_util.py b/Orange/tests/test_data_util.py index 3af720d4186..8288c509f97 100644 --- a/Orange/tests/test_data_util.py +++ b/Orange/tests/test_data_util.py @@ -4,7 +4,7 @@ import numpy as np -from Orange.data.util import scale, one_hot, SharedComputeValue +from Orange.data.util import scale, one_hot, SharedComputeValue, SubarrayComputeValue import Orange class TestDataUtil(unittest.TestCase): @@ -145,3 +145,60 @@ def test_eq_hash(self): self.assertNotEqual(c1, e) self.assertNotEqual(hash(c1), hash(e)) + + +class DummyPlusSubarray(SubarrayComputeValue): + pass + + +class TestSubarrayComputeValue(unittest.TestCase): + + def test_values(self): + fn = lambda data, cols: data.X[:, cols] + 1 + cv1 = DummyPlusSubarray(fn, 1) + cv2 = DummyPlusSubarray(fn, 3) + iris = Orange.data.Table("iris") + domain = Orange.data.Domain([ + Orange.data.ContinuousVariable("cv1", compute_value=cv1), + Orange.data.ContinuousVariable("cv2", compute_value=cv2) + ]) + data = iris.transform(domain) + np.testing.assert_equal(iris.X[:, [1,3]] + 1, data.X) + + def test_with_row_indices(self): + fn = lambda data, cols: data.X[:, cols] + 1 + cv = DummyPlusSubarray(fn, 1) + iris = Orange.data.Table("iris") + domain = Orange.data.Domain([Orange.data.ContinuousVariable("cv", compute_value=cv)]) + data1 = Orange.data.Table.from_table(domain, iris)[10:20] + data2 = Orange.data.Table.from_table(domain, iris, range(10, 20)) + np.testing.assert_equal(data1.X, data2.X) + + def test_single_call(self): + fn = lambda data, cols: data.X[:, cols] + 1 + mockfn = Mock(side_effect=fn) + cvs = [DummyPlusSubarray(mockfn, i) for i in range(4)] + self.assertEqual(mockfn.call_count, 0) + data = Orange.data.Table("iris")[45:55] # two classes + domain = Orange.data.Domain([at.copy(compute_value=cv) + for at, cv in zip(data.domain.attributes, cvs)], + data.domain.class_vars) + + assert cvs[0].compute_shared is mockfn + + Orange.data.Table.from_table(domain, data) + self.assertEqual(mockfn.call_count, 1) + ndata = Orange.data.Table.from_table(domain, data) + self.assertEqual(mockfn.call_count, 2) + + np.testing.assert_equal(ndata.X, data.X + 1) + + # learner performs imputation + c = Orange.classification.LogisticRegressionLearner()(ndata) + self.assertEqual(mockfn.call_count, 2) + c(data) # new data should be converted with one call + self.assertEqual(mockfn.call_count, 3) + + # test with descendants of table + DummyTable.from_table(c.domain, data) + self.assertEqual(mockfn.call_count, 4) From cbedefac0ad93ca841e2b7691d9db84c8c7a24c4 Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Tue, 18 Jul 2023 13:04:02 +0200 Subject: [PATCH 9/9] SubarrayNorms and SubarrayImpute get __eq__ and __hash__ --- Orange/preprocess/normalize.py | 23 +++++++++++++++++++++++ Orange/preprocess/preprocess.py | 22 ++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/Orange/preprocess/normalize.py b/Orange/preprocess/normalize.py index 5847c45354e..1296a93cd24 100644 --- a/Orange/preprocess/normalize.py +++ b/Orange/preprocess/normalize.py @@ -16,6 +16,7 @@ def __init__(self, source_vars, offsets, factors): self.source_vars = tuple(source_vars) self.offsets = np.array(offsets) self.factors = np.array(factors) + self._hash = None def __call__(self, data, cols): X = data.transform(Domain(self.source_vars[cols])).X @@ -29,6 +30,28 @@ def __call__(self, data, cols): else: return (X-offsets.reshape(1, -1)) * (factors.reshape(1, -1)) + def __eq__(self, other): + if self is other: + return True + return type(self) is type(other) \ + and self.source_vars == other.source_vars \ + and np.all(self.offsets == other.offsets) \ + and np.all(self.factors == other.factors) + + def __setstate__(self, state): + self.__dict__.update(state) + self._hash = None + + def __getstate__(self): + state = self.__dict__.copy() + del state["_hash"] + return state + + def __hash__(self): + if self._hash is None: + self._hash = hash((self.source_vars, tuple(self.offsets), tuple(self.factors))) + return self._hash + def compress_norm_to_subarray(domain): source_vars = [] diff --git a/Orange/preprocess/preprocess.py b/Orange/preprocess/preprocess.py index a6195ee2d44..44f7cc6f6f2 100644 --- a/Orange/preprocess/preprocess.py +++ b/Orange/preprocess/preprocess.py @@ -155,6 +155,7 @@ class SubarrayImpute: def __init__(self, source_vars, vals): self.source_vars = tuple(source_vars) self.vals = np.array(vals) + self._hash = None def __call__(self, data, cols): X = data.transform(Orange.data.Domain(self.source_vars[cols])).X @@ -172,6 +173,27 @@ def __call__(self, data, cols): else: return np.where(np.isnan(X), vals.reshape(1, -1), X) + def __eq__(self, other): + if self is other: + return True + return type(self) is type(other) \ + and self.source_vars == other.source_vars \ + and np.all(self.vals == other.vals) + + def __setstate__(self, state): + self.__dict__.update(state) + self._hash = None + + def __getstate__(self): + state = self.__dict__.copy() + del state["_hash"] + return state + + def __hash__(self): + if self._hash is None: + self._hash = hash((self.source_vars, tuple(self.vals))) + return self._hash + def compress_replace_unknowns_to_subarray(domain): source_vars = []