Skip to content

Commit

Permalink
Fix odo breakage from dask API changes
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Apr 23, 2015
1 parent 7db40e9 commit e85a0dc
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 27 deletions.
45 changes: 21 additions & 24 deletions odo/backends/dask.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
from __future__ import absolute_import, division, print_function
from operator import add
from collections import Iterator

import numpy as np
from toolz import merge, accumulate
from toolz import keyfilter
from datashape.dispatch import dispatch
from datashape import DataShape

from dask.array.core import rec_concatenate, Array, getem, get, names, from_array
from dask.array.core import rec_concatenate, Array, get, from_array
from dask.bag.core import Bag
from dask.core import flatten
import dask.bag as db
from dask.compatibility import long

from odo import append, chunks, convert, discover, into, TextFile
from odo import append, chunks, convert, discover, TextFile
from ..utils import keywords

##############
# dask.Array #
##############

@discover.register(Array)
def discover_dask_array(a, **kwargs):
Expand All @@ -26,8 +22,12 @@ def discover_dask_array(a, **kwargs):


arrays = [np.ndarray]

try:
import h5py
except ImportError:
pass
else:
arrays.append(h5py.Dataset)

@dispatch(h5py.Dataset, (int, long))
Expand All @@ -39,24 +39,24 @@ def resize(x, size):
@dispatch(h5py.Dataset, tuple)
def resize(x, shape):
return x.resize(shape)
except ImportError:
pass

try:
import bcolz
except ImportError:
pass
else:
arrays.append(bcolz.carray)

@dispatch(bcolz.carray, (int, long))
def resize(x, size):
return x.resize(size)
except ImportError:
pass


@convert.register(Array, tuple(arrays), cost=1.)
def array_to_dask(x, name=None, blockshape=None, **kwargs):
if blockshape is None:
raise NotImplementedError("blockshape cannot be None")
return from_array(x, blockshape=blockshape, name=name, **kwargs)
def array_to_dask(x, name=None, chunks=None, **kwargs):
if chunks is None:
raise ValueError("chunks cannot be None")
return from_array(x, chunks=chunks, name=name, **kwargs)


@convert.register(np.ndarray, Array, cost=10.)
Expand All @@ -75,11 +75,9 @@ def store_Array_in_ooc_data(out, arr, inplace=False, **kwargs):
# Resize output dataset to accept new data
assert out.shape[1:] == arr.shape[1:]
resize(out, out.shape[0] + arr.shape[0]) # elongate
return arr.store(out)
arr.store(out)
return out

############
# dask.bag #
############

@convert.register(Iterator, Bag)
def bag_to_iterator(x, **kwargs):
Expand All @@ -88,11 +86,10 @@ def bag_to_iterator(x, **kwargs):

@convert.register(Bag, chunks(TextFile))
def bag_to_iterator(x, **kwargs):
return Bag.from_filenames([tf.path for tf in x])
return db.from_filenames([tf.path for tf in x])


@convert.register(Bag, list)
def bag_to_iterator(x, **kwargs):
keys = keywords(Bag.from_sequence)
kwargs2 = dict((k, v) for k, v in kwargs.items() if k in keys)
return Bag.from_sequence(x, **kwargs2)
kwargs = keyfilter(keywords(db.from_sequence).__contains__, kwargs)
return db.from_sequence(x, **kwargs)
6 changes: 3 additions & 3 deletions odo/backends/tests/test_dask_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from odo.backends.dask import append
from dask.array.core import insert_to_ooc, Array
from dask import core
from odo import convert, into
from odo import convert, into, odo
from odo.utils import tmpfile
import numpy as np

Expand Down Expand Up @@ -42,8 +42,8 @@ def test_append_to_array():
append(b, a)
assert eq(b[:], x)

with tmpfile('hdf5') as fn:
h = into(fn + '::/data', a)
with tmpfile('.hdf5') as fn:
h = odo(a, fn + '::/data')
assert eq(h[:], x)
h.file.close()

Expand Down

0 comments on commit e85a0dc

Please sign in to comment.