Skip to content

Commit

Permalink
Merge pull request #179 from cpcloud/dask-api-change
Browse files Browse the repository at this point in the history
Fix odo breakage for dask API changes
  • Loading branch information
cpcloud committed Apr 23, 2015
2 parents 62ea6ae + e85a0dc commit fa842fd
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 137 deletions.
45 changes: 21 additions & 24 deletions odo/backends/dask.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
from __future__ import absolute_import, division, print_function
from operator import add
from collections import Iterator

import numpy as np
from toolz import merge, accumulate
from toolz import keyfilter
from datashape.dispatch import dispatch
from datashape import DataShape

from dask.array.core import rec_concatenate, Array, getem, get, names, from_array
from dask.array.core import rec_concatenate, Array, get, from_array
from dask.bag.core import Bag
from dask.core import flatten
import dask.bag as db
from dask.compatibility import long

from odo import append, chunks, convert, discover, into, TextFile
from odo import append, chunks, convert, discover, TextFile
from ..utils import keywords

##############
# dask.Array #
##############

@discover.register(Array)
def discover_dask_array(a, **kwargs):
Expand All @@ -26,8 +22,12 @@ def discover_dask_array(a, **kwargs):


arrays = [np.ndarray]

try:
import h5py
except ImportError:
pass
else:
arrays.append(h5py.Dataset)

@dispatch(h5py.Dataset, (int, long))
Expand All @@ -39,24 +39,24 @@ def resize(x, size):
@dispatch(h5py.Dataset, tuple)
def resize(x, shape):
return x.resize(shape)
except ImportError:
pass

try:
import bcolz
except ImportError:
pass
else:
arrays.append(bcolz.carray)

@dispatch(bcolz.carray, (int, long))
def resize(x, size):
return x.resize(size)
except ImportError:
pass


@convert.register(Array, tuple(arrays), cost=1.)
def array_to_dask(x, name=None, blockshape=None, **kwargs):
if blockshape is None:
raise NotImplementedError("blockshape cannot be None")
return from_array(x, blockshape=blockshape, name=name, **kwargs)
def array_to_dask(x, name=None, chunks=None, **kwargs):
if chunks is None:
raise ValueError("chunks cannot be None")
return from_array(x, chunks=chunks, name=name, **kwargs)


@convert.register(np.ndarray, Array, cost=10.)
Expand All @@ -75,11 +75,9 @@ def store_Array_in_ooc_data(out, arr, inplace=False, **kwargs):
# Resize output dataset to accept new data
assert out.shape[1:] == arr.shape[1:]
resize(out, out.shape[0] + arr.shape[0]) # elongate
return arr.store(out)
arr.store(out)
return out

############
# dask.bag #
############

@convert.register(Iterator, Bag)
def bag_to_iterator(x, **kwargs):
Expand All @@ -88,11 +86,10 @@ def bag_to_iterator(x, **kwargs):

@convert.register(Bag, chunks(TextFile))
def bag_to_iterator(x, **kwargs):
return Bag.from_filenames([tf.path for tf in x])
return db.from_filenames([tf.path for tf in x])


@convert.register(Bag, list)
def bag_to_iterator(x, **kwargs):
keys = keywords(Bag.from_sequence)
kwargs2 = dict((k, v) for k, v in kwargs.items() if k in keys)
return Bag.from_sequence(x, **kwargs2)
kwargs = keyfilter(keywords(db.from_sequence).__contains__, kwargs)
return db.from_sequence(x, **kwargs)
113 changes: 0 additions & 113 deletions odo/backends/tests/test_dask.py

This file was deleted.

76 changes: 76 additions & 0 deletions odo/backends/tests/test_dask_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import absolute_import, division, print_function

import pytest

pytest.importorskip('dask')

from toolz import merge
from odo.backends.dask import append
from dask.array.core import insert_to_ooc, Array
from dask import core
from odo import convert, into, odo
from odo.utils import tmpfile
import numpy as np


def eq(a, b):
c = a == b
if isinstance(c, np.ndarray):
c = c.all()
return c


def test_convert():
x = np.arange(600).reshape((20, 30))
d = convert(Array, x, chunks=(4, 5))
assert isinstance(d, Array)


def test_convert_to_numpy_array():
x = np.arange(600).reshape((20, 30))
d = convert(Array, x, chunks=(4, 5))
x2 = convert(np.ndarray, d)
assert eq(x, x2)


def test_append_to_array():
bcolz = pytest.importorskip('bcolz')
x = np.arange(600).reshape((20, 30))
a = into(Array, x, chunks=(4, 5))
b = bcolz.zeros(shape=(0, 30), dtype=x.dtype)

append(b, a)
assert eq(b[:], x)

with tmpfile('.hdf5') as fn:
h = odo(a, fn + '::/data')
assert eq(h[:], x)
h.file.close()


def test_into_inplace():
bcolz = pytest.importorskip('bcolz')
x = np.arange(600).reshape((20, 30))
a = into(Array, x, chunks=(4, 5))
b = bcolz.zeros(shape=(20, 30), dtype=x.dtype)

append(b, a, inplace=True)
assert eq(b[:], x)


def test_insert_to_ooc():
x = np.arange(600).reshape((20, 30))
y = np.empty(shape=x.shape, dtype=x.dtype)
a = convert(Array, x, chunks=(4, 5))

dsk = insert_to_ooc(y, a)
core.get(merge(dsk, a.dask), list(dsk.keys()))

assert eq(y, x)


def test_array_interface():
x = np.arange(600).reshape((20, 30))
d = convert(Array, x, chunks=(4, 5))

assert eq(x, np.array(d))
38 changes: 38 additions & 0 deletions odo/backends/tests/test_dask_bag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest

pytest.importorskip('dask.bag')

from odo import chunks, TextFile, odo
from dask.bag import Bag
from odo.utils import filetexts


def inc(x):
return x + 1


dsk = {('x', 0): (range, 5),
('x', 1): (range, 5),
('x', 2): (range, 5)}

L = list(range(5)) * 3

b = Bag(dsk, 'x', 3)


def test_convert_bag_to_list():
assert odo(b, list) == L


def test_convert_logfiles_to_bag():
with filetexts({'a1.log': 'Hello\nWorld', 'a2.log': 'Hola\nMundo'}) as fns:
logs = chunks(TextFile)(list(map(TextFile, fns)))
b = odo(logs, Bag)
assert isinstance(b, Bag)
assert 'a1.log' in str(b.dask.values())
assert odo(b, list) == odo(logs, list)


def test_sequence():
b = odo([1, 2, 3], Bag)
assert set(b.map(inc)) == set([2, 3, 4])

0 comments on commit fa842fd

Please sign in to comment.