From 7db40e9987be50808ad90bee80b4927ef0b01472 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Thu, 23 Apr 2015 09:40:45 -0400 Subject: [PATCH 1/2] Separate bag and array tests from dask --- .../{test_dask.py => test_dask_array.py} | 63 ++++--------------- odo/backends/tests/test_dask_bag.py | 38 +++++++++++ 2 files changed, 51 insertions(+), 50 deletions(-) rename odo/backends/tests/{test_dask.py => test_dask_array.py} (50%) create mode 100644 odo/backends/tests/test_dask_bag.py diff --git a/odo/backends/tests/test_dask.py b/odo/backends/tests/test_dask_array.py similarity index 50% rename from odo/backends/tests/test_dask.py rename to odo/backends/tests/test_dask_array.py index 0d959f53..4bdccaec 100644 --- a/odo/backends/tests/test_dask.py +++ b/odo/backends/tests/test_dask_array.py @@ -1,20 +1,17 @@ from __future__ import absolute_import, division, print_function import pytest + pytest.importorskip('dask') -pytest.importorskip('dask.bag') -from odo.backends.dask import append, merge +from toolz import merge +from odo.backends.dask import append from dask.array.core import insert_to_ooc, Array -from dask.bag.core import Bag from dask import core -from odo import chunks, convert, into, TextFile -from odo.utils import tmpfile, filetexts +from odo import convert, into +from odo.utils import tmpfile import numpy as np -#################### -# dask.array tests # -#################### def eq(a, b): c = a == b @@ -25,30 +22,28 @@ def eq(a, b): def test_convert(): x = np.arange(600).reshape((20, 30)) - d = convert(Array, x, blockshape=(4, 5)) - + d = convert(Array, x, chunks=(4, 5)) assert isinstance(d, Array) def test_convert_to_numpy_array(): x = np.arange(600).reshape((20, 30)) - d = convert(Array, x, blockshape=(4, 5)) + d = convert(Array, x, chunks=(4, 5)) x2 = convert(np.ndarray, d) - assert eq(x, x2) def test_append_to_array(): bcolz = pytest.importorskip('bcolz') x = np.arange(600).reshape((20, 30)) - a = into(Array, x, blockshape=(4, 5)) + a = into(Array, x, chunks=(4, 5)) b = bcolz.zeros(shape=(0, 30), dtype=x.dtype) append(b, a) assert eq(b[:], x) with tmpfile('hdf5') as fn: - h = into(fn+'::/data', a) + h = into(fn + '::/data', a) assert eq(h[:], x) h.file.close() @@ -56,7 +51,7 @@ def test_append_to_array(): def test_into_inplace(): bcolz = pytest.importorskip('bcolz') x = np.arange(600).reshape((20, 30)) - a = into(Array, x, blockshape=(4, 5)) + a = into(Array, x, chunks=(4, 5)) b = bcolz.zeros(shape=(20, 30), dtype=x.dtype) append(b, a, inplace=True) @@ -66,7 +61,7 @@ def test_into_inplace(): def test_insert_to_ooc(): x = np.arange(600).reshape((20, 30)) y = np.empty(shape=x.shape, dtype=x.dtype) - a = convert(Array, x, blockshape=(4, 5)) + a = convert(Array, x, chunks=(4, 5)) dsk = insert_to_ooc(y, a) core.get(merge(dsk, a.dask), list(dsk.keys())) @@ -74,40 +69,8 @@ def test_insert_to_ooc(): assert eq(y, x) -def test__array__(): +def test_array_interface(): x = np.arange(600).reshape((20, 30)) - d = convert(Array, x, blockshape=(4, 5)) + d = convert(Array, x, chunks=(4, 5)) assert eq(x, np.array(d)) - -################## -# dask.bag tests # -################## - - -def inc(x): - return x + 1 - -dsk = {('x', 0): (range, 5), - ('x', 1): (range, 5), - ('x', 2): (range, 5)} - -L = list(range(5)) * 3 - -b = Bag(dsk, 'x', 3) - -def test_convert_bag_to_list(): - assert convert(list, b) == L - -def test_convert_logfiles_to_bag(): - with filetexts({'a1.log': 'Hello\nWorld', 'a2.log': 'Hola\nMundo'}) as fns: - logs = chunks(TextFile)(list(map(TextFile, fns))) - b = convert(Bag, logs) - assert isinstance(b, Bag) - assert 'a1.log' in str(b.dask.values()) - assert convert(list, b) == convert(list, logs) - - -def test_sequence(): - b = into(Bag, [1, 2, 3]) - assert set(b.map(inc)) == set([2, 3, 4]) diff --git a/odo/backends/tests/test_dask_bag.py b/odo/backends/tests/test_dask_bag.py new file mode 100644 index 00000000..4a4e39dd --- /dev/null +++ b/odo/backends/tests/test_dask_bag.py @@ -0,0 +1,38 @@ +import pytest + +pytest.importorskip('dask.bag') + +from odo import chunks, TextFile, odo +from dask.bag import Bag +from odo.utils import filetexts + + +def inc(x): + return x + 1 + + +dsk = {('x', 0): (range, 5), + ('x', 1): (range, 5), + ('x', 2): (range, 5)} + +L = list(range(5)) * 3 + +b = Bag(dsk, 'x', 3) + + +def test_convert_bag_to_list(): + assert odo(b, list) == L + + +def test_convert_logfiles_to_bag(): + with filetexts({'a1.log': 'Hello\nWorld', 'a2.log': 'Hola\nMundo'}) as fns: + logs = chunks(TextFile)(list(map(TextFile, fns))) + b = odo(logs, Bag) + assert isinstance(b, Bag) + assert 'a1.log' in str(b.dask.values()) + assert odo(b, list) == odo(logs, list) + + +def test_sequence(): + b = odo([1, 2, 3], Bag) + assert set(b.map(inc)) == set([2, 3, 4]) From e85a0dc88cc705e2e7073a2994fa97945a5dab78 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Thu, 23 Apr 2015 09:45:30 -0400 Subject: [PATCH 2/2] Fix odo breakage from dask API changes --- odo/backends/dask.py | 45 +++++++++++++-------------- odo/backends/tests/test_dask_array.py | 6 ++-- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/odo/backends/dask.py b/odo/backends/dask.py index 8220bea3..62258623 100644 --- a/odo/backends/dask.py +++ b/odo/backends/dask.py @@ -1,23 +1,19 @@ from __future__ import absolute_import, division, print_function -from operator import add from collections import Iterator import numpy as np -from toolz import merge, accumulate +from toolz import keyfilter from datashape.dispatch import dispatch from datashape import DataShape -from dask.array.core import rec_concatenate, Array, getem, get, names, from_array +from dask.array.core import rec_concatenate, Array, get, from_array from dask.bag.core import Bag -from dask.core import flatten +import dask.bag as db from dask.compatibility import long -from odo import append, chunks, convert, discover, into, TextFile +from odo import append, chunks, convert, discover, TextFile from ..utils import keywords -############## -# dask.Array # -############## @discover.register(Array) def discover_dask_array(a, **kwargs): @@ -26,8 +22,12 @@ def discover_dask_array(a, **kwargs): arrays = [np.ndarray] + try: import h5py +except ImportError: + pass +else: arrays.append(h5py.Dataset) @dispatch(h5py.Dataset, (int, long)) @@ -39,24 +39,24 @@ def resize(x, size): @dispatch(h5py.Dataset, tuple) def resize(x, shape): return x.resize(shape) -except ImportError: - pass + try: import bcolz +except ImportError: + pass +else: arrays.append(bcolz.carray) @dispatch(bcolz.carray, (int, long)) def resize(x, size): return x.resize(size) -except ImportError: - pass @convert.register(Array, tuple(arrays), cost=1.) -def array_to_dask(x, name=None, blockshape=None, **kwargs): - if blockshape is None: - raise NotImplementedError("blockshape cannot be None") - return from_array(x, blockshape=blockshape, name=name, **kwargs) +def array_to_dask(x, name=None, chunks=None, **kwargs): + if chunks is None: + raise ValueError("chunks cannot be None") + return from_array(x, chunks=chunks, name=name, **kwargs) @convert.register(np.ndarray, Array, cost=10.) @@ -75,11 +75,9 @@ def store_Array_in_ooc_data(out, arr, inplace=False, **kwargs): # Resize output dataset to accept new data assert out.shape[1:] == arr.shape[1:] resize(out, out.shape[0] + arr.shape[0]) # elongate - return arr.store(out) + arr.store(out) + return out -############ -# dask.bag # -############ @convert.register(Iterator, Bag) def bag_to_iterator(x, **kwargs): @@ -88,11 +86,10 @@ def bag_to_iterator(x, **kwargs): @convert.register(Bag, chunks(TextFile)) def bag_to_iterator(x, **kwargs): - return Bag.from_filenames([tf.path for tf in x]) + return db.from_filenames([tf.path for tf in x]) @convert.register(Bag, list) def bag_to_iterator(x, **kwargs): - keys = keywords(Bag.from_sequence) - kwargs2 = dict((k, v) for k, v in kwargs.items() if k in keys) - return Bag.from_sequence(x, **kwargs2) + kwargs = keyfilter(keywords(db.from_sequence).__contains__, kwargs) + return db.from_sequence(x, **kwargs) diff --git a/odo/backends/tests/test_dask_array.py b/odo/backends/tests/test_dask_array.py index 4bdccaec..c16808b7 100644 --- a/odo/backends/tests/test_dask_array.py +++ b/odo/backends/tests/test_dask_array.py @@ -8,7 +8,7 @@ from odo.backends.dask import append from dask.array.core import insert_to_ooc, Array from dask import core -from odo import convert, into +from odo import convert, into, odo from odo.utils import tmpfile import numpy as np @@ -42,8 +42,8 @@ def test_append_to_array(): append(b, a) assert eq(b[:], x) - with tmpfile('hdf5') as fn: - h = into(fn + '::/data', a) + with tmpfile('.hdf5') as fn: + h = odo(a, fn + '::/data') assert eq(h[:], x) h.file.close()