From 23e87b71b32399fb816e445d2a193e7609a1cf7f Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sat, 11 Jan 2025 16:14:47 -0600 Subject: [PATCH 01/16] Compatiblity with dask 2025.1.0 This updates dask-geopandas to be compatible with the next release of dask, which moves dask-expr into dask.dataframe. I've dropped support for all older versions of dask. Only new versions, based on dask-expr / with query planning, are supported. --- dask_geopandas/__init__.py | 50 +- dask_geopandas/backends.py | 19 - dask_geopandas/clip.py | 20 +- dask_geopandas/core.py | 1069 +---------------------------- dask_geopandas/expr.py | 34 +- dask_geopandas/io/arrow.py | 28 +- dask_geopandas/io/file.py | 25 +- dask_geopandas/io/parquet.py | 20 +- dask_geopandas/sjoin.py | 34 +- dask_geopandas/tests/test_core.py | 34 +- pyproject.toml | 3 +- 11 files changed, 96 insertions(+), 1240 deletions(-) diff --git a/dask_geopandas/__init__.py b/dask_geopandas/__init__.py index ec8867e..13c0c82 100644 --- a/dask_geopandas/__init__.py +++ b/dask_geopandas/__init__.py @@ -1,58 +1,42 @@ from ._version import get_versions -from . import backends - -if backends.QUERY_PLANNING_ON: - from .expr import ( - points_from_xy, - from_wkt, - from_wkb, - GeoDataFrame, - GeoSeries, - from_geopandas, - from_dask_dataframe, - ) -else: - from .core import ( - points_from_xy, - from_wkt, - from_wkb, - GeoDataFrame, - GeoSeries, - from_geopandas, - from_dask_dataframe, - ) +from .expr import ( + points_from_xy, + from_wkt, + from_wkb, + GeoDataFrame, + GeoSeries, + from_geopandas, + from_dask_dataframe, +) from .io.file import read_file from .io.parquet import read_parquet, to_parquet from .io.arrow import read_feather, to_feather from .clip import clip from .sjoin import sjoin +from . import backends as _ # needed to register dispatch functions with dask __version__ = get_versions()["version"] del get_versions __all__ = [ - "points_from_xy", - "from_wkt", - "from_wkb", "GeoDataFrame", "GeoSeries", - "from_geopandas", + "clip", "from_dask_dataframe", - "read_file", + "from_geopandas", + "from_wkb", + "from_wkt", + "points_from_xy", "read_feather", + "read_file", "read_parquet", + "sjoin", "to_feather", "to_parquet", - "clip", - "sjoin", ] from . import _version __version__ = _version.get_versions()["version"] - -from . import _version - -__version__ = _version.get_versions()["version"] diff --git a/dask_geopandas/backends.py b/dask_geopandas/backends.py index 48eeb8f..1a6919e 100644 --- a/dask_geopandas/backends.py +++ b/dask_geopandas/backends.py @@ -4,24 +4,6 @@ import pandas as pd import dask -from dask import config - -# Check if dask-dataframe is using dask-expr (mimix the logic of dask.dataframe -# _dask_expr_enabled() - default of None means True as well if dask-expr is available) -QUERY_PLANNING_ON = config.get("dataframe.query-planning", False) -if QUERY_PLANNING_ON is None: - if Version(pd.__version__).major < 2: - QUERY_PLANNING_ON = False - else: - try: - import dask_expr # noqa: F401 - except ImportError: - # dask will raise error or warning depending on the config - QUERY_PLANNING_ON = False - else: - QUERY_PLANNING_ON = True - - from dask.base import normalize_token from dask.dataframe.backends import _nonempty_index, meta_nonempty_dataframe from dask.dataframe.core import get_parallel_type @@ -85,7 +67,6 @@ def tokenize_geometryarray(x): @pyarrow_schema_dispatch.register((geopandas.GeoDataFrame,)) def get_pyarrow_schema_geopandas(obj): - import pandas as pd import pyarrow as pa df = pd.DataFrame(obj.copy()) diff --git a/dask_geopandas/clip.py b/dask_geopandas/clip.py index 93be39d..2ace9bf 100644 --- a/dask_geopandas/clip.py +++ b/dask_geopandas/clip.py @@ -1,21 +1,16 @@ import numpy as np from dask.base import tokenize +from dask.dataframe import from_graph from dask.highlevelgraph import HighLevelGraph from dask.utils import derived_from import geopandas -from . import backends - @derived_from(geopandas.tools) def clip(gdf, mask, keep_geom_type=False): - - if backends.QUERY_PLANNING_ON: - from .expr import GeoDataFrame, GeoSeries - else: - from .core import GeoDataFrame, GeoSeries + from dask_geopandas import GeoDataFrame, GeoSeries if isinstance(mask, (GeoDataFrame, GeoSeries)): raise NotImplementedError("Mask cannot be a Dask GeoDataFrame or GeoSeries.") @@ -45,17 +40,8 @@ def clip(gdf, mask, keep_geom_type=False): } divisions = [None] * (len(dsk) + 1) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[gdf]) - if backends.QUERY_PLANNING_ON: - from dask_expr import from_graph - - result = from_graph(graph, gdf._meta, tuple(divisions), dsk.keys(), "clip") - else: - from .core import GeoDataFrame, GeoSeries - if isinstance(gdf, GeoDataFrame): - result = GeoDataFrame(graph, name, gdf._meta, tuple(divisions)) - elif isinstance(gdf, GeoSeries): - result = GeoSeries(graph, name, gdf._meta, tuple(divisions)) + result = from_graph(graph, gdf._meta, tuple(divisions), dsk.keys(), "clip") result.spatial_partitions = new_spatial_partitions return result diff --git a/dask_geopandas/core.py b/dask_geopandas/core.py index 4ef455c..86b3ee4 100644 --- a/dask_geopandas/core.py +++ b/dask_geopandas/core.py @@ -1,1066 +1,9 @@ import warnings -from packaging.version import Version -import numpy as np -import pandas as pd +from .expr import * # noqa: F403 -import dask -import dask.array as da -import dask.dataframe as dd -from dask.base import tokenize -from dask.dataframe.core import _emulate, elemwise, map_partitions, new_dd_object -from dask.highlevelgraph import HighLevelGraph -from dask.utils import M, OperatorMethodMixin, derived_from, ignore_warning - -import geopandas -import shapely -from shapely.geometry import box -from shapely.geometry.base import BaseGeometry - -import dask_geopandas - -from .geohash import _geohash -from .hilbert_distance import _hilbert_distance -from .morton_distance import _morton_distance - -DASK_2022_8_1 = Version(dask.__version__) >= Version("2022.8.1") -GEOPANDAS_0_12 = Version(geopandas.__version__) >= Version("0.12.0") -GEOPANDAS_1_0 = Version(geopandas.__version__) >= Version("1.0.0a0") -PANDAS_2_0_0 = Version(pd.__version__) >= Version("2.0.0") - - -if Version(shapely.__version__) < Version("2.0"): - raise ImportError( - "Running dask-geopandas requires Shapely >= 2.0 to be " - "installed. However, Shapely is only at " - f"version {shapely.__version__}" - ) - - -def _set_crs(df, crs, allow_override): - """Return a new object with crs set to ``crs``""" - return df.set_crs(crs, allow_override=allow_override) - - -class _Frame(dd.core._Frame, OperatorMethodMixin): - """Superclass for DataFrame and Series - - Parameters - ---------- - dsk : dict - The dask graph to compute this DataFrame - name : str - The key prefix that specifies which keys in the dask comprise this - particular DataFrame / Series - meta : geopandas.GeoDataFrame, geopandas.GeoSeries - An empty geopandas object with names, dtypes, and indices matching the - expected output. - divisions : tuple of index values - Values along which we partition our blocks on the index - """ - - _partition_type = geopandas.base.GeoPandasBase - - def __init__(self, dsk, name, meta, divisions, spatial_partitions=None): - super().__init__(dsk, name, meta, divisions) - self.spatial_partitions = spatial_partitions - - def to_dask_dataframe(self): - """Create a dask.dataframe object from a dask_geopandas object""" - return self.map_partitions(pd.DataFrame) - - def __dask_postpersist__(self): - return self._rebuild, () - - def _rebuild(self, dsk, *, rename=None): - # this is a copy of the dask.dataframe version, only with the addition - # to pass self.spatial_partitions - name = self._name - if rename: - name = rename.get(name, name) - return type(self)( - dsk, name, self._meta, self.divisions, self.spatial_partitions - ) - - @property - def spatial_partitions(self): - """ - The spatial extent of each of the partitions of the dask GeoDataFrame. - """ - return self._spatial_partitions - - @spatial_partitions.setter - def spatial_partitions(self, value): - if value is not None: - if not isinstance(value, geopandas.GeoSeries): - raise TypeError( - "Expected a geopandas.GeoSeries for the spatial_partitions, " - f"got {type(value)} instead." - ) - if len(value) != self.npartitions: - raise ValueError( - f"Expected spatial partitions of length {self.npartitions}, " - f"got {len(value)} instead." - ) - self._spatial_partitions = value - - @property - def _args(self): - # Ensure we roundtrip through pickle correctly - # https://github.com/geopandas/dask-geopandas/issues/237 - return super()._args + (self.spatial_partitions,) - - def __setstate__(self, state): - *dask_state, spatial_partitions = state - super().__setstate__(dask_state) - self.spatial_partitions = spatial_partitions - - @classmethod - def _bind_property(cls, attr, preserve_spatial_partitions=False): - """Map property to partitions and bind to class""" - - def prop(self): - meta = getattr(self._meta, attr) - result = self.map_partitions(getattr, attr, token=attr, meta=meta) - if preserve_spatial_partitions: - result = self._propagate_spatial_partitions(result) - return result - - doc = getattr(cls._partition_type, attr).__doc__ - # Insert disclaimer that this is a copied docstring note that - # malformed docs will not get the disclaimer (see #4746). - if doc: - doc = ignore_warning(doc, cls._partition_type, attr) - setattr(cls, name, property(fget=prop, doc=doc)) - - @classmethod - def _bind_elemwise_comparison_method( - cls, name, comparison, original, *args, **kwargs - ): - """bind comparison method like GeoSeries.contains to this class""" - - def meth(self, other, *args, **kwargs): - return elemwise(comparison, self, other, *args, **kwargs) - - meth.__name__ = name - setattr(cls, name, derived_from(original)(meth)) - - @classmethod - def _bind_elemwise_operator_method(cls, name, op, original, *args, **kwargs): - """bind operator method like GeoSeries.distance to this class""" - - # name must be explicitly passed for div method whose name is truediv - def meth(self, other, *args, **kwargs): - meta = _emulate(op, self, other) - return map_partitions( - op, self, other, *args, meta=meta, enforce_metadata=False, **kwargs - ) - - meth.__name__ = name - setattr(cls, name, derived_from(original)(meth)) - - def calculate_spatial_partitions(self): - """Calculate spatial partitions""" - # TEMP method to calculate spatial partitions for testing, need to - # add better methods (set_partitions / repartition) - parts = geopandas.GeoSeries( - self.map_partitions( - lambda part: shapely.convex_hull( - shapely.geometrycollections(np.asarray(part.geometry)) - ) - ).compute(), - crs=self.crs, - ) - self.spatial_partitions = parts - - def _propagate_spatial_partitions(self, new_object): - """ - We need to override several dask methods to ensure the spatial - partitions are properly propagated. - This is a helper method to set this. - """ - new_object.spatial_partitions = self.spatial_partitions - return new_object - - @property - @derived_from(geopandas.GeoSeries) - def crs(self): - """ - The Coordinate Reference System (CRS) represented as a ``pyproj.CRS`` - object. - - Returns None if the CRS is not set, and to set the value it - :getter: Returns a ``pyproj.CRS`` or None. When setting, the value - can be anything accepted by :meth:`pyproj.CRS.from_user_input`, - such as an authority string (eg "EPSG:4326") or a WKT string. - """ - return self._meta.crs - - @crs.setter - def crs(self, value): - """Sets the value of the crs""" - # When using setter, Geopandas always overrides the CRS - new = self.set_crs(value, allow_override=True) - self._meta = new._meta - self._name = new._name - self.dask = new.dask - - @derived_from(geopandas.GeoSeries) - def set_crs(self, value, allow_override=False): - """Set the value of the crs on a new object""" - new = self.map_partitions( - _set_crs, value, allow_override, enforce_metadata=False - ) - if self.spatial_partitions is not None: - new.spatial_partitions = self.spatial_partitions.set_crs( - value, allow_override=allow_override - ) - return new - - @derived_from(geopandas.GeoSeries) - def to_crs(self, crs=None, epsg=None): - return self.map_partitions(M.to_crs, crs=crs, epsg=epsg) - - def copy(self): - """Make a copy of the dataframe - - Creates shallow copies of the computational graph and spatial partitions. - Does not affect the underlying data. - """ - self_copy = super().copy() - if self.spatial_partitions is not None: - self_copy.spatial_partitions = self.spatial_partitions.copy() - return self_copy - - @property - @derived_from(geopandas.base.GeoPandasBase) - def total_bounds(self): - def agg(concatted): - return np.array( - ( - np.nanmin(concatted[0::4]), # minx - np.nanmin(concatted[1::4]), # miny - np.nanmax(concatted[2::4]), # maxx - np.nanmax(concatted[3::4]), # maxy - ) - ) - - total_bounds = self.reduction( - lambda x: getattr(x, "total_bounds"), - token="total_bounds", - meta=self._meta.total_bounds, - aggregate=agg, - ) - return da.Array( - total_bounds.dask, - total_bounds.name, - chunks=((4,),), - dtype=total_bounds.dtype, - ) - - @property - def sindex(self): - """Need to figure out how to concatenate spatial indexes""" - raise NotImplementedError - - @property - @derived_from(geopandas.base.GeoPandasBase) - def unary_union(self): - warnings.warn( - "The 'unary_union' attribute is deprecated, " - "use the 'union_all()' method instead.", - FutureWarning, - stacklevel=2, - ) - if GEOPANDAS_1_0: - return self.union_all() - else: - return self._unary_union() - - def _unary_union(self): - attr = "unary_union" - meta = BaseGeometry() - - return self.reduction( - lambda x: getattr(x, attr), - token=attr, - aggregate=lambda x: getattr(geopandas.GeoSeries(x), attr), - meta=meta, - ) - - def union_all(self): - if not GEOPANDAS_1_0: - return self._unary_union() - - attr = "union_all" - meta = BaseGeometry() - - return self.reduction( - lambda x: x.union_all(), - token=attr, - aggregate=lambda x: geopandas.GeoSeries(x).union_all(), - meta=meta, - ) - - @derived_from(geopandas.base.GeoPandasBase) - def representative_point(self): - return self.map_partitions( - self._partition_type.representative_point, enforce_metadata=False - ) - - @derived_from(geopandas.base.GeoPandasBase) - def geom_equals_exact(self, other, tolerance): - comparison = self._partition_type.geom_equals_exact - return elemwise(comparison, self, other, tolerance) - - @derived_from(geopandas.base.GeoPandasBase) - def buffer(self, distance, resolution=16, **kwargs): - return self.map_partitions( - self._partition_type.buffer, - distance, - resolution=resolution, - enforce_metadata=False, - **kwargs, - ) - - @derived_from(geopandas.base.GeoPandasBase) - def simplify(self, *args, **kwargs): - return self.map_partitions( - self._partition_type.simplify, *args, enforce_metadata=False, **kwargs - ) - - @derived_from(geopandas.base.GeoPandasBase) - def interpolate(self, distance, normalized=False): - return self.map_partitions( - self._partition_type.interpolate, - distance, - normalized=normalized, - enforce_metadata=False, - ) - - @derived_from(geopandas.base.GeoPandasBase) - def affine_transform(self, matrix): - return self.map_partitions( - self._partition_type.affine_transform, matrix, enforce_metadata=False - ) - - @derived_from(geopandas.base.GeoPandasBase) - def translate(self, xoff=0.0, yoff=0.0, zoff=0.0): - return self.map_partitions( - self._partition_type.translate, - xoff=xoff, - yoff=yoff, - zoff=zoff, - enforce_metadata=False, - ) - - @derived_from(geopandas.base.GeoPandasBase) - def rotate(self, angle, origin="center", use_radians=False): - return self.map_partitions( - self._partition_type.rotate, - angle, - origin=origin, - use_radians=use_radians, - enforce_metadata=False, - ) - - @derived_from(geopandas.base.GeoPandasBase) - def scale(self, xfact=1.0, yfact=1.0, zfact=1.0, origin="center"): - return self.map_partitions( - self._partition_type.scale, - xfact=xfact, - yfact=yfact, - zfact=zfact, - origin=origin, - enforce_metadata=False, - ) - - @derived_from(geopandas.base.GeoPandasBase) - def skew(self, xs=0.0, ys=0.0, origin="center", use_radians=False): - return self.map_partitions( - self._partition_type.skew, - xs=xs, - ys=ys, - origin=origin, - use_radians=use_radians, - enforce_metadata=False, - ) - - @property - @derived_from(geopandas.geodataframe.GeoDataFrame) - def cx(self): - """ - Coordinate based indexer to select by intersection with bounding box. - - Format of input should be ``.cx[xmin:xmax, ymin:ymax]``. Any of - ``xmin``, ``xmax``, ``ymin``, and ``ymax`` can be provided, but input - must include a comma separating x and y slices. That is, ``.cx[:, :]`` - will return the full series/frame, but ``.cx[:]`` is not implemented. - """ - return _CoordinateIndexer(self) - - def hilbert_distance(self, total_bounds=None, level=16): - """ - Calculate the distance along a Hilbert curve. - - The distances are calculated for the midpoints of the geometries in the - GeoDataFrame, and using the total bounds of the GeoDataFrame. - - The Hilbert distance can be used to spatially partition Dask-GeoPandas - objects, by mapping two dimensional geometries along the Hilbert curve. - - Parameters - ---------- - total_bounds : 4-element array, optional - The spatial extent in which the curve is constructed (used to - rescale the geometry midpoints). By default, the total bounds - of the full dask GeoDataFrame will be computed (from the spatial - partitions, if available, otherwise computed from the full - dataframe). If known, you can pass the total bounds to avoid this - extra computation. - level : int (1 - 16), default 16 - Determines the precision of the curve (points on the curve will - have coordinates in the range [0, 2^level - 1]). - - Returns - ------- - dask.Series - Series containing distances for each partition - - """ - # Compute total bounds of all partitions rather than each partition - if total_bounds is None: - if self.spatial_partitions is not None: - total_bounds = self.spatial_partitions.total_bounds - else: - total_bounds = self.total_bounds - - # Calculate hilbert distances for each partition - distances = self.map_partitions( - _hilbert_distance, - total_bounds=total_bounds, - level=level, - meta=pd.Series([], name="hilbert_distance", dtype="uint32"), - ) - - return distances - - def morton_distance(self, total_bounds=None, level=16): - """ - Calculate the distance of geometries along the Morton curve - - The Morton curve is also known as Z-order https://en.wikipedia.org/wiki/Z-order. - - The Morton distance can be used to spatially partition Dask-GeoPandas objects, - by mapping two-dimensional geometries along the Morton space-filing curve. - - Each geometry is represented by the midpoint of its bounds and linked to the - Morton curve. The function returns a distance from the beginning - of the curve to the linked point. - - Morton distance is more performant than ``hilbert_distance`` but can result in - less optimal partitioning. - - Parameters - ---------- - - total_bounds : 4-element array, optional - The spatial extent in which the curve is constructed (used to - rescale the geometry midpoints). By default, the total bounds - of the full dask GeoDataFrame will be computed (from the spatial - partitions, if available, otherwise computed from the full - dataframe). If known, you can pass the total bounds to avoid this - extra computation. - level : int (1 - 16), default 16 - Determines the precision of the Morton curve. - - Returns - ------- - dask.Series - Series containing distances along the Morton curve - - """ - # Compute total bounds of all partitions rather than each partition - if total_bounds is None: - if self.spatial_partitions is not None: - total_bounds = self.spatial_partitions.total_bounds - else: - total_bounds = self.total_bounds - - # Calculate Morton distances for each partition - distances = self.map_partitions( - _morton_distance, - total_bounds=total_bounds, - level=level, - meta=pd.Series([], name="morton_distance", dtype="uint32"), - ) - - return distances - - def geohash(self, as_string=True, precision=12): - """ - Calculate geohash based on the middle points of the geometry bounds - for a given precision. - Only geographic coordinates (longitude, latitude) are supported. - - Parameters - ---------- - as_string : bool, default True - To return string or int Geohash. - precision : int (1 - 12), default 12 - Precision of the string geohash values. Only used when - ``as_string=True``. - - Returns - ------- - type : pandas.Series - Series containing Geohash - """ - - if precision not in range(1, 13): - raise ValueError( - "The Geohash precision only accepts an integer value between 1 and 12" - ) - - if as_string is True: - dtype = object - else: - dtype = np.uint64 - - geohashes = self.map_partitions( - _geohash, - as_string=as_string, - precision=precision, - meta=pd.Series([], name="geohash", dtype=dtype), - ) - - return geohashes - - @derived_from(geopandas.GeoDataFrame) - def clip(self, mask, keep_geom_type=False): - return dask_geopandas.clip(self, mask=mask, keep_geom_type=keep_geom_type) - - @derived_from(geopandas.GeoDataFrame) - def to_wkt(self, **kwargs): - meta = self._meta.to_wkt(**kwargs) - return self.map_partitions(M.to_wkt, **kwargs, meta=meta) - - @derived_from(geopandas.GeoDataFrame) - def to_wkb(self, hex=False, **kwargs): - meta = self._meta.to_wkb(hex=hex, **kwargs) - return self.map_partitions(M.to_wkb, hex=hex, **kwargs, meta=meta) - - -class GeoSeries(_Frame, dd.core.Series): - """Parallel GeoPandas GeoSeries - - Do not use this class directly. Instead use functions like - :func:`dask_geopandas.read_parquet`,or :func:`dask_geopandas.from_geopandas`. - """ - - _partition_type = geopandas.GeoSeries - - @derived_from(geopandas.GeoSeries) - def explode(self, ignore_index=False, index_parts=None): - return self.map_partitions( - M.explode, - ignore_index=ignore_index, - index_parts=index_parts, - enforce_metadata=False, - ) - - -class GeoDataFrame(_Frame, dd.core.DataFrame): - """Parallel GeoPandas GeoDataFrame - - Do not use this class directly. Instead use functions like - :func:`dask_geopandas.read_parquet`,or :func:`dask_geopandas.from_geopandas`. - """ - - _partition_type = geopandas.GeoDataFrame - - @property - def geometry(self): - geometry_column_name = self._meta._geometry_column_name - if geometry_column_name not in self.columns: - raise AttributeError( - "No geometry data set yet (expected in" - " column '%s'." % geometry_column_name - ) - return self[geometry_column_name] - - @geometry.setter - def geometry(self, col): - """Sets the geometry column""" - new = self.set_geometry(col) - self._meta = new._meta - self._name = new._name - self.dask = new.dask - - @derived_from(dd.DataFrame) - def set_index(self, *args, **kwargs): - """Override to ensure we get GeoDataFrame with set geometry column""" - ddf = super().set_index(*args, **kwargs) - return ddf.set_geometry(self._meta.geometry.name) - - @derived_from(geopandas.GeoDataFrame) - def set_geometry(self, col): - # calculate ourselves to use meta and not meta_nonempty, which would - # raise an error if meta is an invalid GeoDataFrame (e.g. geometry - # column name not yet set correctly) - if isinstance(col, GeoSeries): - meta = self._meta.set_geometry(col._meta) - else: - meta = self._meta.set_geometry(col) - return self.map_partitions(M.set_geometry, col, meta=meta) - - @derived_from(geopandas.GeoDataFrame) - def rename_geometry(self, col): - meta = self._meta.rename_geometry(col) - return self.map_partitions(M.rename_geometry, col, meta=meta) - - def __getitem__(self, key): - """ - If the result is a new dask_geopandas.GeoDataFrame/GeoSeries (automatically - determined by dask based on the meta), then pass through the spatial - partitions information. - """ - result = super().__getitem__(key) - if isinstance(result, _Frame): - result = self._propagate_spatial_partitions(result) - return result - - def _repr_html_(self): - output = super()._repr_html_() - return output.replace( - "Dask DataFrame Structure", "Dask-GeoPandas GeoDataFrame Structure" - ) - - @derived_from(dd.DataFrame) - def to_parquet(self, path, *args, **kwargs): - """See dask_geopadandas.to_parquet docstring for more information""" - from .io.parquet import to_parquet - - return to_parquet(self, path, *args, **kwargs) - - def to_feather(self, path, *args, **kwargs): - """See dask_geopadandas.to_feather docstring for more information""" - from .io.arrow import to_feather - - return to_feather(self, path, *args, **kwargs) - - def dissolve(self, by=None, aggfunc="first", split_out=1, **kwargs): - """Dissolve geometries within ``groupby`` into a single geometry. - - Parameters - ---------- - by : string, default None - Column whose values define groups to be dissolved. If None, - whole GeoDataFrame is considered a single group. - aggfunc : function, string or dict, default "first" - Aggregation function for manipulation of data associated - with each group. Passed to dask ``groupby.agg`` method. - Note that ``aggfunc`` needs to be applicable to all columns (i.e. ``"mean"`` - cannot be used with string dtype). Select only required columns before - ``dissolve`` or pass a dictionary mapping to ``aggfunc`` to specify the - aggregation function for each column separately. - split_out : int, default 1 - Number of partitions of the output - - **kwargs - keyword arguments passed to ``groupby`` - - Examples - -------- - >>> ddf.dissolve("foo", split_out=12) - - >>> ddf[["foo", "bar", "geometry"]].dissolve("foo", aggfunc="mean") - - >>> ddf.dissolve("foo", aggfunc={"bar": "mean", "baz": "first"}) - - """ - if by is None: - by = lambda x: 0 - drop = [self.geometry.name] - else: - drop = [by, self.geometry.name] - - def union(block): - if GEOPANDAS_1_0: - merged_geom = block.union_all() - else: - merged_geom = block.unary_union - return merged_geom - - merge_geometries = dd.Aggregation( - "merge_geometries", lambda s: s.agg(union), lambda s0: s0.agg(union) - ) - if isinstance(aggfunc, dict): - data_agg = aggfunc - else: - data_agg = {col: aggfunc for col in self.columns.drop(drop)} - data_agg[self.geometry.name] = merge_geometries - # dask 2022.8.1 added shuffle keyword, enabled by default if split_out > 1 - # starting with dask 2022.9.1, but geopandas doesn't yet work with shuffle - # https://github.com/geopandas/dask-geopandas/pull/229 - agg_kwargs = {"shuffle": False} if DASK_2022_8_1 else {} - aggregated = self.groupby(by=by, **kwargs).agg( - data_agg, split_out=split_out, **agg_kwargs - ) - return aggregated.set_crs(self.crs) - - def sjoin(self, df, how="inner", predicate="intersects"): - """ - Spatial join of two GeoDataFrames. - - Parameters - ---------- - df : geopandas or dask_geopandas GeoDataFrame - If a geopandas.GeoDataFrame is passed, it is considered as a - dask_geopandas.GeoDataFrame with 1 partition (without spatial - partitioning information). - how : string, default 'inner' - The type of join. Currently only 'inner' is supported. - predicate : string, default 'intersects' - Binary predicate how to match corresponding rows of the left and right - GeoDataFrame. Possible values: 'contains', 'contains_properly', - 'covered_by', 'covers', 'crosses', 'intersects', 'overlaps', - 'touches', 'within'. - - Returns - ------- - dask_geopandas.GeoDataFrame - - Notes - ----- - If both the left and right GeoDataFrame have spatial partitioning - information available (the ``spatial_partitions`` attribute is set), - the output partitions are determined based on intersection of the - spatial partitions. In all other cases, the output partitions are - all combinations (cartesian/cross product) of all input partition - of the left and right GeoDataFrame. - """ - return dask_geopandas.sjoin(self, df, how=how, predicate=predicate) - - def spatial_shuffle( - self, - by="hilbert", - level=None, - calculate_partitions=True, - npartitions=None, - divisions=None, - **kwargs, - ): - """ - Shuffle the data into spatially consistent partitions. - - This realigns the dataset to be spatially sorted, i.e. geometries that are - spatially near each other will be within the same partition. This is - useful especially for overlay operations like a spatial join as it reduces the - number of interactions between individual partitions. - - The spatial information is stored in the index and will replace the existing - index. - - Note that ``spatial_shuffle`` uses ``set_index`` under the hood and comes with - all its potential performance drawbacks. - - Parameters - ---------- - by : string (default 'hilbert') - Spatial sorting method, one of {'hilbert', 'morton', 'geohash'}. See - ``hilbert_distance``, ``morton_distance`` and ``geohash`` methods for - details. - level : int (default None) - Level (precision) of the Hilbert and Morton - curves used as a sorting method. Defaults to 16. Does not have an effect for - the ``'geohash'`` option. - calculate_partitions : bool (default True) - Calculate new spatial partitions after shuffling - npartitions : int, None, or 'auto' - The ideal number of output partitions. If None, use the same as the input. - If 'auto' then decide by memory use. Only used when divisions is not given. - If divisions is given, the number of output partitions will be - len(divisions) - 1. - divisions: list, optional - The “dividing lines” used to split the new index into partitions. Needs to - match the values returned by the sorting method. - **kwargs - Keyword arguments passed to ``set_index``. - - Returns - ------- - dask_geopandas.GeoDataFrame - - Notes - ----- - This method, similarly to ``calculate_spatial_partitions``, is computed - partially eagerly as it needs to calculate the distances for all existing - partitions before it can determine the divisions for the new - spatially-shuffled partitions. - """ - if level is None: - level = 16 - if by == "hilbert": - by = self.hilbert_distance(level=level) - elif by == "morton": - by = self.morton_distance(level=level) - elif by == "geohash": - by = self.geohash(as_string=False) - else: - raise ValueError( - f"'{by}' is not supported. Use one of ['hilbert', 'morton, 'geohash']." - ) - - sorted_ddf = self.set_index( - by, - sorted=False, - npartitions=npartitions, - divisions=divisions, - inplace=False, - **kwargs, - ) - - if calculate_partitions: - sorted_ddf.calculate_spatial_partitions() - - return sorted_ddf - - @derived_from(geopandas.GeoDataFrame) - def explode(self, column=None, ignore_index=False, index_parts=None): - return self.map_partitions( - M.explode, - column=column, - ignore_index=ignore_index, - index_parts=index_parts, - enforce_metadata=False, - ) - - -from_geopandas = dd.from_pandas - - -def from_dask_dataframe(df, geometry=None): - """ - Create GeoDataFrame from dask DataFrame. - - Parameters - ---------- - df : dask DataFrame - geometry : str or array-like, optional - If a string, the column to use as geometry. By default, it will look - for a column named "geometry". If array-like or dask (Geo)Series, - the values will be set as 'geometry' column on the GeoDataFrame. - - """ - # If the geometry column is already a partitioned `_Frame`, we can't refer to - # it via a keyword-argument due to https://github.com/dask/dask/issues/8308. - # Instead, we assign the geometry column using regular dataframe operations, - # then refer to that column by name in `map_partitions`. - if isinstance(geometry, dd.core.Series): - name = geometry.name if geometry.name is not None else "geometry" - return df.assign(**{name: geometry}).map_partitions( - geopandas.GeoDataFrame, geometry=name - ) - return df.map_partitions(geopandas.GeoDataFrame, geometry=geometry) - - -@derived_from(geopandas) -def points_from_xy(df, x="x", y="y", z="z", crs=None): - """Convert dask.dataframe of x and y (and optionally z) values to a GeoSeries.""" - - def func(data, x, y, z): - return geopandas.GeoSeries( - geopandas.points_from_xy( - data[x], data[y], data[z] if z in df.columns else None, crs=crs - ), - index=data.index, - ) - - return df.map_partitions( - func, x, y, z, meta=geopandas.GeoSeries(crs=crs), token="points_from_xy" - ) - - -def from_wkt(wkt, crs=None): - """ - Convert dask.dataframe.Series of WKT objects to a GeoSeries. - - Parameters - ---------- - wkt: dask Series - A dask Series containing WKT objects. - crs: value, optional - Coordinate Reference System of the geometry objects. - Can be anything accepted by - :meth:`pyproj.CRS.from_user_input() `, - such as an authority string (eg "EPSG:4326") or a WKT string. - - Returns - ------- - GeoSeries - """ - - def func(data): - return geopandas.GeoSeries.from_wkt(data, index=data.index, crs=crs) - - return wkt.map_partitions(func, meta=geopandas.GeoSeries(), token="from_wkt") - - -def from_wkb(wkb, crs=None): - """ - Convert dask.dataframe.Series of WKB objects to a GeoSeries. - - Parameters - ---------- - wkb: dask Series - A dask Series containing WKB objects. - crs: value, optional - Coordinate Reference System of the geometry objects. - Can be anything accepted by - :meth:`pyproj.CRS.from_user_input() `, - such as an authority string (eg "EPSG:4326") or a WKT string. - - Returns - ------- - GeoSeries - """ - - def func(data): - return geopandas.GeoSeries.from_wkb(data, index=data.index, crs=crs) - - return wkb.map_partitions(func, meta=geopandas.GeoSeries(), token="from_wkb") - - -for name in [ - "area", - "geom_type", - "type", - "length", - "is_valid", - "is_empty", - "is_simple", - "is_ring", - "has_z", - "interiors", - "bounds", -]: - _Frame._bind_property(name) - - -for name in [ - "boundary", - "centroid", - "convex_hull", - "envelope", - "exterior", -]: - # TODO actually calculate envelope / convex_hull of the spatial partitions - # for some of those - _Frame._bind_property(name, preserve_spatial_partitions=True) - - -for name in [ - "geometry", - "x", - "y", - "z", -]: - GeoSeries._bind_property(name) - -for name in [ - "contains", - "geom_equals", - "geom_almost_equals", - "crosses", - "disjoint", - "intersects", - "overlaps", - "touches", - "within", - "covers", - "covered_by", -]: - meth = getattr(geopandas.base.GeoPandasBase, name) - _Frame._bind_elemwise_comparison_method( - name, meth, original=geopandas.base.GeoPandasBase - ) - - -for name in [ - "distance", - "difference", - "symmetric_difference", - "union", - "intersection", - "relate", - "project", -]: - meth = getattr(geopandas.base.GeoPandasBase, name) - _Frame._bind_elemwise_operator_method( - name, meth, original=geopandas.base.GeoPandasBase - ) - - -dd.core.DataFrame.set_geometry = GeoDataFrame.set_geometry - - -# Coodinate indexer (.cx) - - -def _cx_part(df, bbox): - idx = df.intersects(bbox) - return df[idx] - - -class _CoordinateIndexer(object): - def __init__(self, obj): - self.obj = obj - - def __getitem__(self, key): - obj = self.obj - xs, ys = key - # handle numeric values as x and/or y coordinate index - if type(xs) is not slice: - xs = slice(xs, xs) - if type(ys) is not slice: - ys = slice(ys, ys) - if xs.step is not None or ys.step is not None: - raise ValueError("Slice step not supported.") - - if self.obj.spatial_partitions is not None: - xmin, ymin, xmax, ymax = obj.spatial_partitions.total_bounds - bbox = box( - xs.start if xs.start is not None else xmin, - ys.start if ys.start is not None else ymin, - xs.stop if xs.stop is not None else xmax, - ys.stop if ys.stop is not None else ymax, - ) - partition_idx = np.nonzero( - np.asarray(self.obj.spatial_partitions.intersects(bbox)) - )[0] - else: - raise NotImplementedError( - "Not yet implemented if the GeoDataFrame has no known spatial " - "partitions (you can call the 'calculate_spatial_partitions' method " - "to set it)" - ) - - name = "cx-%s" % tokenize(key, self.obj) - - if len(partition_idx): - # construct graph (based on LocIndexer from dask) - dsk = {} - for i, part in enumerate(partition_idx): - dsk[name, i] = (_cx_part, (self.obj._name, part), bbox) - - divisions = [self.obj.divisions[i] for i in partition_idx] + [ - self.obj.divisions[partition_idx[-1] + 1] - ] - else: - # TODO can a dask dataframe have 0 partitions? - dsk = {(name, 0): self.obj._meta.head(0)} - divisions = [None, None] - - graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self.obj]) - return new_dd_object(graph, name, meta=self.obj._meta, divisions=divisions) +warnings.warn( + "dask_geopandas.core is deprecated and will be removed in a future version.", + category=FutureWarning, + stacklevel=1, +) diff --git a/dask_geopandas/expr.py b/dask_geopandas/expr.py index dcdafa8..f84bf81 100644 --- a/dask_geopandas/expr.py +++ b/dask_geopandas/expr.py @@ -6,20 +6,28 @@ import dask import dask.array as da +import dask.dataframe import dask.dataframe as dd -import dask_expr as dx from dask.base import tokenize -from dask.dataframe import map_partitions -from dask.highlevelgraph import HighLevelGraph -from dask.utils import M, OperatorMethodMixin, derived_from, ignore_warning -from dask_expr import ( +from dask.dataframe import from_graph, get_collection_type, map_partitions +from dask.dataframe.api import ( + ApplyConcatApply, + FrameBase, elemwise, - from_graph, - get_collection_type, + emulate, + new_collection, ) -from dask_expr._collection import new_collection -from dask_expr._expr import ApplyConcatApply, _emulate +from dask.highlevelgraph import HighLevelGraph +from dask.utils import M, OperatorMethodMixin, derived_from, ignore_warning +# from dask_expr import ( +# elemwise, +# from_graph, +# get_collection_type, +# ) +# from dask_expr._collection import new_collection +# from dask_expr._expr import ApplyConcatApply, _emulate +# import dask_expr as dx import geopandas import shapely from shapely.geometry import box @@ -102,7 +110,7 @@ def _set_crs(df, crs, allow_override): return df.set_crs(crs, allow_override=allow_override) -class _Frame(dx.FrameBase, OperatorMethodMixin): +class _Frame(FrameBase, OperatorMethodMixin): """Superclass for DataFrame and Series Parameters @@ -213,7 +221,7 @@ def _bind_elemwise_operator_method(cls, name, op, original, *args, **kwargs): # name must be explicitly passed for div method whose name is truediv def meth(self, other, *args, **kwargs): - meta = _emulate(op, self, other) + meta = emulate(op, self, other) return map_partitions( op, self, other, *args, meta=meta, enforce_metadata=False, **kwargs ) @@ -869,7 +877,7 @@ def explode(self, column=None, ignore_index=False, index_parts=None): ) -from_geopandas = dx.from_pandas +from_geopandas = dd.from_pandas def from_dask_dataframe(df, geometry=None): @@ -889,7 +897,7 @@ def from_dask_dataframe(df, geometry=None): # it via a keyword-argument due to https://github.com/dask/dask/issues/8308. # Instead, we assign the geometry column using regular dataframe operations, # then refer to that column by name in `map_partitions`. - if isinstance(geometry, dx.Series): + if isinstance(geometry, dask.dataframe.Series): name = geometry.name if geometry.name is not None else "geometry" return df.assign(**{name: geometry}).map_partitions( geopandas.GeoDataFrame, geometry=name diff --git a/dask_geopandas/io/arrow.py b/dask_geopandas/io/arrow.py index d6601c9..f4ad8b8 100644 --- a/dask_geopandas/io/arrow.py +++ b/dask_geopandas/io/arrow.py @@ -9,7 +9,8 @@ import dask from dask.base import compute_as_if_collection, tokenize -from dask.dataframe.core import Scalar, new_dd_object +from dask.dataframe import from_graph +from dask.dataframe.dask_expr._collection import Scalar from dask.highlevelgraph import HighLevelGraph from dask.layers import DataFrameIOLayer from dask.utils import apply, natural_sort_key @@ -17,8 +18,6 @@ import geopandas import shapely.geometry -from .. import backends - DASK_2022_12_0_PLUS = Version(dask.__version__) >= Version("2022.12.0") DASK_2023_04_0 = Version(dask.__version__) >= Version("2023.4.0") @@ -94,7 +93,6 @@ class ArrowDatasetEngine: @classmethod def read_metadata(cls, fs, paths, columns, filters, index): - import pyarrow.dataset as ds from pyarrow.parquet import _filters_to_expression @@ -142,7 +140,6 @@ def read_metadata(cls, fs, paths, columns, filters, index): def _arrow_table_to_pandas( cls, arrow_table: "pyarrow.Table", categories, **kwargs ) -> pd.DataFrame: - _kwargs = kwargs.get("arrow_to_pandas", {}) _kwargs.update({"use_threads": False, "ignore_metadata": False}) use_nullable_dtypes = _extract_nullable_dtypes(**kwargs) @@ -169,7 +166,6 @@ def _types_mapper(pa_type): @classmethod def read_partition(cls, fs, fragment, schema, columns, filter, **kwargs): - table = fragment.to_table( schema=schema, columns=columns, filter=filter, use_threads=False ) @@ -391,19 +387,13 @@ def read_feather( label=label, ) graph = HighLevelGraph({output_name: layer}, {output_name: set()}) - - if backends.QUERY_PLANNING_ON: - from dask_expr import from_graph - - result = from_graph( - graph, - meta, - [None] * (len(parts) + 1), - [(output_name, i) for i in range(len(parts))], - "read_feather", - ) - else: - result = new_dd_object(graph, output_name, meta, [None] * (len(parts) + 1)) + result = from_graph( + graph, + meta, + [None] * (len(parts) + 1), + [(output_name, i) for i in range(len(parts))], + "read_feather", + ) result.spatial_partitions = spatial_partitions return result diff --git a/dask_geopandas/io/file.py b/dask_geopandas/io/file.py index 3870d84..a225a44 100644 --- a/dask_geopandas/io/file.py +++ b/dask_geopandas/io/file.py @@ -3,11 +3,9 @@ from pandas import RangeIndex from dask.base import tokenize -from dask.dataframe.core import new_dd_object +from dask.dataframe import from_graph from dask.highlevelgraph import HighLevelGraph -from .. import backends - class FileFunctionWrapper: """ @@ -141,16 +139,11 @@ def read_file( ) graph = HighLevelGraph({output_name: layer}, {output_name: set()}) - if backends.QUERY_PLANNING_ON: - from dask_expr import from_graph - - result = from_graph( - graph, - meta, - divs, - [(output_name, i) for i in range(len(divs) - 1)], - "read_file", - ) - return result - else: - return new_dd_object(graph, output_name, meta, divs) + result = from_graph( + graph, + meta, + divs, + [(output_name, i) for i in range(len(divs) - 1)], + "read_file", + ) + return result diff --git a/dask_geopandas/io/parquet.py b/dask_geopandas/io/parquet.py index b0f9924..3ccc575 100644 --- a/dask_geopandas/io/parquet.py +++ b/dask_geopandas/io/parquet.py @@ -4,7 +4,6 @@ import geopandas -from .. import backends from .arrow import ( DASK_2022_12_0_PLUS, DASK_2023_04_0, @@ -101,22 +100,19 @@ def _create_dd_meta(cls, dataset_info, use_nullable_dtypes=False): def read_parquet(*args, **kwargs): - from dask.dataframe.io import read_parquet + from dask.dataframe import read_parquet result = read_parquet(*args, engine=GeoArrowEngine, **kwargs) # check if spatial partitioning information was stored spatial_partitions = result._meta.attrs.get("spatial_partitions", None) - if backends.QUERY_PLANNING_ON: - from dask_expr import from_graph - - result = from_graph( - result.dask, - result._meta, - result.divisions, - result.__dask_keys__(), - "read_parquet", - ) + result = dd.from_graph( + result.dask, + result._meta, + result.divisions, + result.__dask_keys__(), + "read_parquet", + ) result.spatial_partitions = spatial_partitions return result diff --git a/dask_geopandas/sjoin.py b/dask_geopandas/sjoin.py index 95283c8..4bcc280 100644 --- a/dask_geopandas/sjoin.py +++ b/dask_geopandas/sjoin.py @@ -3,16 +3,12 @@ import numpy as np from dask.base import tokenize +from dask.dataframe import from_graph from dask.highlevelgraph import HighLevelGraph import geopandas -from . import backends - -if backends.QUERY_PLANNING_ON: - from .expr import from_geopandas -else: - from .core import from_geopandas +from .expr import from_geopandas def sjoin(left, right, how="inner", predicate="intersects", **kwargs): @@ -62,13 +58,12 @@ def sjoin(left, right, how="inner", predicate="intersects", **kwargs): if isinstance(right, geopandas.GeoDataFrame): right = from_geopandas(right, npartitions=1) - if backends.QUERY_PLANNING_ON: - # We call optimize on the inputs to ensure that any optimizations - # done by dask-expr (which might change the expression, and thus the - # name of the DataFrame) *before* we build the HighLevelGraph. - # https://github.com/dask/dask-expr/issues/1129 - left = left.optimize() - right = right.optimize() + # We call optimize on the inputs to ensure that any optimizations + # done by dask-expr (which might change the expression, and thus the + # name of the DataFrame) *before* we build the HighLevelGraph. + # https://github.com/dask/dask-expr/issues/1129 + left = left.optimize() + right = right.optimize() name = "sjoin-" + tokenize(left, right, how, predicate) meta = geopandas.sjoin(left._meta, right._meta, how=how, predicate=predicate) @@ -122,13 +117,6 @@ def sjoin(left, right, how="inner", predicate="intersects", **kwargs): else: new_spatial_partitions = None - if backends.QUERY_PLANNING_ON: - from dask_expr import from_graph - - result = from_graph(graph, meta, divisions, dsk.keys(), "sjoin") - result.spatial_partitions = new_spatial_partitions - return result - else: - from .core import GeoDataFrame - - return GeoDataFrame(graph, name, meta, divisions, new_spatial_partitions) + result = from_graph(graph, meta, divisions, dsk.keys(), "sjoin") + result.spatial_partitions = new_spatial_partitions + return result diff --git a/dask_geopandas/tests/test_core.py b/dask_geopandas/tests/test_core.py index e46ec7c..e33cc45 100644 --- a/dask_geopandas/tests/test_core.py +++ b/dask_geopandas/tests/test_core.py @@ -6,24 +6,18 @@ import dask import dask.dataframe as dd +from dask.dataframe.dask_expr._collection import Scalar import geopandas from shapely.geometry import LineString, MultiPoint, Point, Polygon import dask_geopandas - -import pytest - -if dask_geopandas.backends.QUERY_PLANNING_ON: - from dask_expr._collection import Scalar -else: - from dask.dataframe.core import Scalar - -from dask_geopandas.core import GEOPANDAS_1_0, PANDAS_2_0_0 +from dask_geopandas.expr import GEOPANDAS_1_0, PANDAS_2_0_0 from dask_geopandas.geohash import _geohash from dask_geopandas.hilbert_distance import _hilbert_distance from dask_geopandas.morton_distance import _morton_distance +import pytest from geopandas.testing import assert_geodataframe_equal, assert_geoseries_equal from pandas.testing import assert_frame_equal, assert_series_equal @@ -545,22 +539,15 @@ def test_from_dask_dataframe_with_dask_geoseries(): dask_obj, geometry=dask_geopandas.points_from_xy(dask_obj, "x", "y") ) - if dask_geopandas.backends.QUERY_PLANNING_ON: - d0 = dask_obj.expr.dependencies() - assert len(d0) == 1 # Assign(frame=df) + d0 = dask_obj.expr.dependencies() + assert len(d0) == 1 # Assign(frame=df) - d1 = d0[0].dependencies() - assert len(d1) == 2 # [df, MapPartitions] + d1 = d0[0].dependencies() + assert len(d1) == 2 # [df, MapPartitions] - # the fact that `geometry` isn't in this map_partitions - # should be sufficient to ensure it isn't in the graph twice - assert len(d1[1].dependencies()) == 1 # [df] - else: - # Check that the geometry isn't concatenated and embedded a second time in - # the high-level graph. cf. https://github.com/geopandas/dask-geopandas/issues/197 - k = next(k for k in dask_obj.dask.dependencies if k.startswith("GeoDataFrame")) - deps = dask_obj.dask.dependencies[k] - assert len(deps) == 1 + # the fact that `geometry` isn't in this map_partitions + # should be sufficient to ensure it isn't in the graph twice + assert len(d1[1].dependencies()) == 1 # [df] expected = df.set_geometry(geopandas.points_from_xy(df["x"], df["y"])) assert_geoseries_equal(dask_obj.geometry.compute(), expected.geometry) @@ -822,7 +809,6 @@ def test_sum(self): # TODO dissolve with split out is not yet working with expressions @pytest.mark.xfail( - dask_geopandas.backends.QUERY_PLANNING_ON, reason="Need to fix dissolve with split_out", ) def test_split_out(self): diff --git a/pyproject.toml b/pyproject.toml index a36c32d..7d00869 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,8 @@ requires-python = ">=3.9" dependencies = [ "geopandas>=0.12", "shapely>=2.0", - "dask[dataframe]>=2022.06.0", + # "dask[dataframe]>=2025.1.0", + "dask @ git+https://github.com/dask/dask", # TODO: wait for release. "packaging", ] From e17e6a347493f021a4801f3cec3f26b650d500fb Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sat, 11 Jan 2025 16:22:55 -0600 Subject: [PATCH 02/16] env update --- ci/envs/{39-minimal.yaml => 310-minimal.yaml} | 8 +++--- ci/envs/310-no-optional-deps.yaml | 13 ---------- ci/envs/311-no-expr.yaml | 26 ------------------- 3 files changed, 5 insertions(+), 42 deletions(-) rename ci/envs/{39-minimal.yaml => 310-minimal.yaml} (61%) delete mode 100644 ci/envs/310-no-optional-deps.yaml delete mode 100644 ci/envs/311-no-expr.yaml diff --git a/ci/envs/39-minimal.yaml b/ci/envs/310-minimal.yaml similarity index 61% rename from ci/envs/39-minimal.yaml rename to ci/envs/310-minimal.yaml index 7097403..7b00f23 100644 --- a/ci/envs/39-minimal.yaml +++ b/ci/envs/310-minimal.yaml @@ -3,10 +3,10 @@ channels: - conda-forge dependencies: # required dependencies - - python=3.9 + - python=3.10 - numpy=1.23 - - dask=2022.06.0 - - distributed=2022.06.0 + # - dask=2025.1.0 # TODO: update this once released + # - distributed=2025.1.0 - geopandas=0.12 - pandas=1.5.3 - shapely=2.0 @@ -22,3 +22,5 @@ dependencies: - pip - pip: - pymorton + - git+https://github.com/dask/dask.git@main + - git+https://github.com/dask/distributed.git@main diff --git a/ci/envs/310-no-optional-deps.yaml b/ci/envs/310-no-optional-deps.yaml deleted file mode 100644 index 99fa647..0000000 --- a/ci/envs/310-no-optional-deps.yaml +++ /dev/null @@ -1,13 +0,0 @@ -name: test -channels: - - conda-forge -dependencies: - # required dependencies - - python=3.10 - - dask - - geopandas - - pyproj - - packaging - # test dependencies - - pytest - - pytest-cov diff --git a/ci/envs/311-no-expr.yaml b/ci/envs/311-no-expr.yaml deleted file mode 100644 index cf88d56..0000000 --- a/ci/envs/311-no-expr.yaml +++ /dev/null @@ -1,26 +0,0 @@ -name: test -channels: - - conda-forge -dependencies: - # required dependencies - - python=3.11 - # pin to last release before dask-expr was turned on by default - - dask=2024.2 - - distributed=2024.2 - - geopandas - - pyproj=3.4 - - packaging - # test dependencies - - pytest - - pytest-cov - - hilbertcurve - - s3fs - - moto<5 # <5 pin because of https://github.com/dask/dask/issues/10869 - - flask # needed for moto server - # optional dependencies - - pyarrow - - pyogrio>=0.4 - - pygeohash - - pip - - pip: - - pymorton From 6be29e826063c0e0e208091ba6950f1db311d570 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sat, 11 Jan 2025 16:25:20 -0600 Subject: [PATCH 03/16] Env updates --- .github/workflows/tests.yaml | 2 +- CHANGELOG.md | 9 +++++++++ ci/envs/310-no-optional-deps.yaml | 13 +++++++++++++ pyproject.toml | 2 +- 4 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 ci/envs/310-no-optional-deps.yaml diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index a595867..28cf305 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -29,8 +29,8 @@ jobs: matrix: os: [ubuntu-latest] env: + - ci/envs/310-minimal.yaml - ci/envs/310-no-optional-deps.yaml - - ci/envs/39-minimal.yaml - ci/envs/311-no-expr.yaml - ci/envs/311-latest.yaml - ci/envs/311-latest-no-expr.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c4773d..4f8e537 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ Changelog ========= +Version 0.4.3 (January, 2025) +---------------------------------- + +Packaging: + +- `dask>=2025.1.0` is now required. +- `python>=3.10` is now required. + + Version 0.4.2 (September 24, 2024) ---------------------------------- diff --git a/ci/envs/310-no-optional-deps.yaml b/ci/envs/310-no-optional-deps.yaml new file mode 100644 index 0000000..99fa647 --- /dev/null +++ b/ci/envs/310-no-optional-deps.yaml @@ -0,0 +1,13 @@ +name: test +channels: + - conda-forge +dependencies: + # required dependencies + - python=3.10 + - dask + - geopandas + - pyproj + - packaging + # test dependencies + - pytest + - pytest-cov diff --git a/pyproject.toml b/pyproject.toml index 7d00869..628865d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ classifiers = [ "Topic :: Scientific/Engineering :: GIS", "Topic :: System :: Distributed Computing", ] -requires-python = ">=3.9" +requires-python = ">=3.10" dependencies = [ "geopandas>=0.12", "shapely>=2.0", From 0bf4f992531741e74e197efa51b7283b69672dd1 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 12 Jan 2025 09:01:17 -0600 Subject: [PATCH 04/16] parquet working --- dask_geopandas/io/parquet.py | 70 +++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/dask_geopandas/io/parquet.py b/dask_geopandas/io/parquet.py index 3ccc575..d22c552 100644 --- a/dask_geopandas/io/parquet.py +++ b/dask_geopandas/io/parquet.py @@ -1,7 +1,6 @@ -from functools import partial +from functools import cached_property, partial import dask.dataframe as dd - import geopandas from .arrow import ( @@ -44,6 +43,18 @@ def _get_partition_bounds_parquet(part, fs): return _get_partition_bounds(pq_metadata.metadata) +def _convert_to_list(column) -> list | None: + if column is None or isinstance(column, list): + pass + elif isinstance(column, tuple): + column = list(column) + elif hasattr(column, "dtype"): + column = column.tolist() + else: + column = [column] + return column + + class GeoArrowEngine(GeoDatasetEngine, DaskArrowDatasetEngine): """ Engine for reading geospatial Parquet datasets. Subclasses dask's @@ -51,12 +62,15 @@ class GeoArrowEngine(GeoDatasetEngine, DaskArrowDatasetEngine): correctly read/write GeoDataFrames. """ - - @classmethod - def read_metadata(cls, fs, paths, **kwargs): - meta, stats, parts, index = super().read_metadata(fs, paths, **kwargs) - - gather_spatial_partitions = kwargs.pop("gather_spatial_partitions", True) + @cached_property + def _meta(self): + meta = super()._meta + gather_spatial_partitions = self._dataset_info.get( + "gather_spatial_partitions", True + ) + fs = self._dataset_info["fs"] + parts = self._dataset_info["parts"] + breakpoint() if gather_spatial_partitions: regions = geopandas.GeoSeries( @@ -67,7 +81,24 @@ def read_metadata(cls, fs, paths, **kwargs): # a bit hacky, but this allows us to get this passed through meta.attrs["spatial_partitions"] = regions - return (meta, stats, parts, index) + return meta + + # @classmethod + # def read_metadata(cls, fs, paths, **kwargs): + # meta, stats, parts, index = super().read_metadata(fs, paths, **kwargs) + + # gather_spatial_partitions = kwargs.pop("gather_spatial_partitions", True) + + # if gather_spatial_partitions: + # regions = geopandas.GeoSeries( + # [_get_partition_bounds_parquet(part, fs) for part in parts], + # crs=meta.crs, + # ) + # if regions.notna().all(): + # # a bit hacky, but this allows us to get this passed through + # meta.attrs["spatial_partitions"] = regions + + # return (meta, stats, parts, index) @classmethod def _update_meta(cls, meta, schema): @@ -77,13 +108,8 @@ def _update_meta(cls, meta, schema): return _update_meta_to_geodataframe(meta, schema.metadata) @classmethod - def _create_dd_meta(cls, dataset_info, use_nullable_dtypes=False): - """Overriding private method for dask >= 2021.10.0""" - if DASK_2022_12_0_PLUS and not DASK_2023_04_0: - meta = super()._create_dd_meta(dataset_info, use_nullable_dtypes) - else: - meta = super()._create_dd_meta(dataset_info) - + def _create_dd_meta(cls, dataset_info): + meta = super()._create_dd_meta(dataset_info) schema = dataset_info["schema"] if not schema.names and not schema.metadata: if len(list(dataset_info["ds"].get_fragments())) == 0: @@ -92,6 +118,18 @@ def _create_dd_meta(cls, dataset_info, use_nullable_dtypes=False): "to read it as an empty DataFrame" ) meta = cls._update_meta(meta, schema) + + if dataset_info["kwargs"].get("gather_spatial_partitions", True): + fs = dataset_info["fs"] + parts, _, _ = cls._construct_collection_plan(dataset_info) + regions = geopandas.GeoSeries( + [_get_partition_bounds_parquet(part, fs) for part in parts], + crs=meta.crs, + ) + if regions.notna().all(): + # a bit hacky, but this allows us to get this passed through + meta.attrs["spatial_partitions"] = regions + return meta From 0d74371eae753f1dbab6cac1533f84bd4ff740ef Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 12 Jan 2025 09:03:09 -0600 Subject: [PATCH 05/16] lint --- dask_geopandas/io/parquet.py | 42 ++---------------------------------- 1 file changed, 2 insertions(+), 40 deletions(-) diff --git a/dask_geopandas/io/parquet.py b/dask_geopandas/io/parquet.py index d22c552..0185889 100644 --- a/dask_geopandas/io/parquet.py +++ b/dask_geopandas/io/parquet.py @@ -1,11 +1,10 @@ -from functools import cached_property, partial +from functools import partial import dask.dataframe as dd + import geopandas from .arrow import ( - DASK_2022_12_0_PLUS, - DASK_2023_04_0, GeoDatasetEngine, _get_partition_bounds, _update_meta_to_geodataframe, @@ -62,43 +61,6 @@ class GeoArrowEngine(GeoDatasetEngine, DaskArrowDatasetEngine): correctly read/write GeoDataFrames. """ - @cached_property - def _meta(self): - meta = super()._meta - gather_spatial_partitions = self._dataset_info.get( - "gather_spatial_partitions", True - ) - fs = self._dataset_info["fs"] - parts = self._dataset_info["parts"] - breakpoint() - - if gather_spatial_partitions: - regions = geopandas.GeoSeries( - [_get_partition_bounds_parquet(part, fs) for part in parts], - crs=meta.crs, - ) - if regions.notna().all(): - # a bit hacky, but this allows us to get this passed through - meta.attrs["spatial_partitions"] = regions - - return meta - - # @classmethod - # def read_metadata(cls, fs, paths, **kwargs): - # meta, stats, parts, index = super().read_metadata(fs, paths, **kwargs) - - # gather_spatial_partitions = kwargs.pop("gather_spatial_partitions", True) - - # if gather_spatial_partitions: - # regions = geopandas.GeoSeries( - # [_get_partition_bounds_parquet(part, fs) for part in parts], - # crs=meta.crs, - # ) - # if regions.notna().all(): - # # a bit hacky, but this allows us to get this passed through - # meta.attrs["spatial_partitions"] = regions - - # return (meta, stats, parts, index) @classmethod def _update_meta(cls, meta, schema): From 1ad0723c61bf2b68a3659db3a109393603ca1ee0 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 12 Jan 2025 09:13:58 -0600 Subject: [PATCH 06/16] Fixed GHA --- .github/workflows/tests.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 28cf305..6bde27b 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -52,7 +52,7 @@ jobs: with: environment-file: ${{ matrix.env }} miniforge-version: latest - miniforge-variant: Mambaforge + miniforge-variant: Miniforge3 use-mamba: true - name: Check and Log Environment From a5f05022370ef5cbb0d4af83977d27a359b36cfb Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 12 Jan 2025 09:20:45 -0600 Subject: [PATCH 07/16] bump min versions to pandas 2.0 --- ci/envs/310-minimal.yaml | 4 ++-- dask_geopandas/backends.py | 2 +- dask_geopandas/tests/test_core.py | 5 +++++ pyproject.toml | 2 +- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/ci/envs/310-minimal.yaml b/ci/envs/310-minimal.yaml index 7b00f23..b7b5325 100644 --- a/ci/envs/310-minimal.yaml +++ b/ci/envs/310-minimal.yaml @@ -7,8 +7,8 @@ dependencies: - numpy=1.23 # - dask=2025.1.0 # TODO: update this once released # - distributed=2025.1.0 - - geopandas=0.12 - - pandas=1.5.3 + - geopandas=0.14.3 + - pandas=2.0.0 - shapely=2.0 - pyproj=3.4 - packaging diff --git a/dask_geopandas/backends.py b/dask_geopandas/backends.py index 1a6919e..0424aca 100644 --- a/dask_geopandas/backends.py +++ b/dask_geopandas/backends.py @@ -16,7 +16,7 @@ from geopandas.array import GeometryArray, GeometryDtype, from_shapely from shapely.geometry.base import BaseGeometry -from .core import GeoDataFrame, GeoSeries +from .expr import GeoDataFrame, GeoSeries get_parallel_type.register(geopandas.GeoDataFrame, lambda _: GeoDataFrame) get_parallel_type.register(geopandas.GeoSeries, lambda _: GeoSeries) diff --git a/dask_geopandas/tests/test_core.py b/dask_geopandas/tests/test_core.py index e33cc45..103bcf8 100644 --- a/dask_geopandas/tests/test_core.py +++ b/dask_geopandas/tests/test_core.py @@ -1032,3 +1032,8 @@ def get_chunk(n): expected = geopandas.GeoDataFrame({"col": [1, 1], "geometry": [Point(1, 1)] * 2}) assert_geodataframe_equal(ddf.compute(), expected) + + +def test_core_deprecated(): + with pytest.warns(FutureWarning, match="dask_geopandas.core"): + import dask_geopandas.core # noqa: F401 diff --git a/pyproject.toml b/pyproject.toml index 628865d..82b5b1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ classifiers = [ ] requires-python = ">=3.10" dependencies = [ - "geopandas>=0.12", + "geopandas>=0.14.3", "shapely>=2.0", # "dask[dataframe]>=2025.1.0", "dask @ git+https://github.com/dask/dask", # TODO: wait for release. From 891c7514ec0dc4780e1bef018011a592dbdb3c1e Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 12 Jan 2025 09:22:15 -0600 Subject: [PATCH 08/16] cleanup --- dask_geopandas/expr.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/dask_geopandas/expr.py b/dask_geopandas/expr.py index f84bf81..d148963 100644 --- a/dask_geopandas/expr.py +++ b/dask_geopandas/expr.py @@ -20,14 +20,6 @@ from dask.highlevelgraph import HighLevelGraph from dask.utils import M, OperatorMethodMixin, derived_from, ignore_warning -# from dask_expr import ( -# elemwise, -# from_graph, -# get_collection_type, -# ) -# from dask_expr._collection import new_collection -# from dask_expr._expr import ApplyConcatApply, _emulate -# import dask_expr as dx import geopandas import shapely from shapely.geometry import box From d867bc2bf6e6a1e14afefbd9cac9b16a14e321c4 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 12 Jan 2025 09:36:30 -0600 Subject: [PATCH 09/16] scalar imports --- dask_geopandas/io/arrow.py | 3 +-- dask_geopandas/tests/test_core.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dask_geopandas/io/arrow.py b/dask_geopandas/io/arrow.py index f4ad8b8..6a8f1a7 100644 --- a/dask_geopandas/io/arrow.py +++ b/dask_geopandas/io/arrow.py @@ -9,8 +9,7 @@ import dask from dask.base import compute_as_if_collection, tokenize -from dask.dataframe import from_graph -from dask.dataframe.dask_expr._collection import Scalar +from dask.dataframe import Scalar, from_graph from dask.highlevelgraph import HighLevelGraph from dask.layers import DataFrameIOLayer from dask.utils import apply, natural_sort_key diff --git a/dask_geopandas/tests/test_core.py b/dask_geopandas/tests/test_core.py index 103bcf8..3031261 100644 --- a/dask_geopandas/tests/test_core.py +++ b/dask_geopandas/tests/test_core.py @@ -6,7 +6,7 @@ import dask import dask.dataframe as dd -from dask.dataframe.dask_expr._collection import Scalar +from dask.dataframe import Scalar import geopandas from shapely.geometry import LineString, MultiPoint, Point, Polygon From 1ebc3e4fb22249cdbafaa83e2130227dfed60f09 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sat, 18 Jan 2025 16:04:57 -0700 Subject: [PATCH 10/16] Bump --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 82b5b1b..c5190d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,8 +34,7 @@ requires-python = ">=3.10" dependencies = [ "geopandas>=0.14.3", "shapely>=2.0", - # "dask[dataframe]>=2025.1.0", - "dask @ git+https://github.com/dask/dask", # TODO: wait for release. + "dask[dataframe]>=2025.1.0", "packaging", ] From a5dd51e53c39648ac47341e5ac8334b4b6cf6e58 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sat, 18 Jan 2025 16:06:42 -0700 Subject: [PATCH 11/16] Fixed env --- ci/envs/310-minimal.yaml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ci/envs/310-minimal.yaml b/ci/envs/310-minimal.yaml index b7b5325..2ace3d7 100644 --- a/ci/envs/310-minimal.yaml +++ b/ci/envs/310-minimal.yaml @@ -5,8 +5,8 @@ dependencies: # required dependencies - python=3.10 - numpy=1.23 - # - dask=2025.1.0 # TODO: update this once released - # - distributed=2025.1.0 + - dask=2025.1.0 + - distributed=2025.1.0 - geopandas=0.14.3 - pandas=2.0.0 - shapely=2.0 @@ -22,5 +22,3 @@ dependencies: - pip - pip: - pymorton - - git+https://github.com/dask/dask.git@main - - git+https://github.com/dask/distributed.git@main From aaac02c5765ee8de15075f14503e60bc4c769ea6 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sat, 18 Jan 2025 16:08:22 -0700 Subject: [PATCH 12/16] remove unused convert_to_list --- dask_geopandas/io/parquet.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/dask_geopandas/io/parquet.py b/dask_geopandas/io/parquet.py index 0185889..dcce6fd 100644 --- a/dask_geopandas/io/parquet.py +++ b/dask_geopandas/io/parquet.py @@ -42,18 +42,6 @@ def _get_partition_bounds_parquet(part, fs): return _get_partition_bounds(pq_metadata.metadata) -def _convert_to_list(column) -> list | None: - if column is None or isinstance(column, list): - pass - elif isinstance(column, tuple): - column = list(column) - elif hasattr(column, "dtype"): - column = column.tolist() - else: - column = [column] - return column - - class GeoArrowEngine(GeoDatasetEngine, DaskArrowDatasetEngine): """ Engine for reading geospatial Parquet datasets. Subclasses dask's From c2f3bd19d3b6cebca590a83b675832b970dba6dc Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 19 Jan 2025 11:04:43 -0700 Subject: [PATCH 13/16] removed no-expr builds --- .github/workflows/tests.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 6bde27b..89e37bd 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -31,9 +31,7 @@ jobs: env: - ci/envs/310-minimal.yaml - ci/envs/310-no-optional-deps.yaml - - ci/envs/311-no-expr.yaml - ci/envs/311-latest.yaml - - ci/envs/311-latest-no-expr.yaml - ci/envs/312-latest.yaml include: From 7d1c64e453704e91e36070ac899243aa8a07307d Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 19 Jan 2025 11:05:27 -0700 Subject: [PATCH 14/16] fix envs --- ci/envs/312-dev.yaml | 1 - doc/requirements.txt | 2 -- 2 files changed, 3 deletions(-) diff --git a/ci/envs/312-dev.yaml b/ci/envs/312-dev.yaml index 8512352..2b2d3ab 100644 --- a/ci/envs/312-dev.yaml +++ b/ci/envs/312-dev.yaml @@ -28,4 +28,3 @@ dependencies: - git+https://github.com/shapely/shapely.git@main - git+https://github.com/geopandas/geopandas.git@main - git+https://github.com/dask/dask.git@main - - git+https://github.com/dask-contrib/dask-expr.git@main diff --git a/doc/requirements.txt b/doc/requirements.txt index 8f119ed..ab605ce 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -6,5 +6,3 @@ myst-parser sphinx_copybutton sphinx matplotlib -# temporarily add this until that is included with dask itself -dask-expr \ No newline at end of file From 298fdef4d58913754e6420d682a9e3ea35f5cf44 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 19 Jan 2025 11:18:55 -0700 Subject: [PATCH 15/16] bump --- ci/envs/310-minimal.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/envs/310-minimal.yaml b/ci/envs/310-minimal.yaml index 2ace3d7..f8f8d3a 100644 --- a/ci/envs/310-minimal.yaml +++ b/ci/envs/310-minimal.yaml @@ -4,7 +4,7 @@ channels: dependencies: # required dependencies - python=3.10 - - numpy=1.23 + - numpy=1.24 - dask=2025.1.0 - distributed=2025.1.0 - geopandas=0.14.3 From faff935e2ca6718618c56b1c4ae8c89991c206f0 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 19 Jan 2025 13:05:55 -0700 Subject: [PATCH 16/16] removed use_nullable_dtypes --- dask_geopandas/io/arrow.py | 48 -------------------------------------- 1 file changed, 48 deletions(-) diff --git a/dask_geopandas/io/arrow.py b/dask_geopandas/io/arrow.py index 6a8f1a7..0a10061 100644 --- a/dask_geopandas/io/arrow.py +++ b/dask_geopandas/io/arrow.py @@ -69,16 +69,6 @@ def _get_partition_bounds(schema_metadata): return shapely.geometry.box(*bbox) -def _extract_nullable_dtypes(**kwargs): - if DASK_2023_04_0: - use_nullable_dtypes = kwargs.get("dtype_backend", None) == "numpy_nullable" - elif DASK_2022_12_0_PLUS: - use_nullable_dtypes = kwargs.get("use_nullable_dtypes", False) - else: - use_nullable_dtypes = False - return use_nullable_dtypes - - class ArrowDatasetEngine: """ Custom IO engine based on pyarrow.dataset. @@ -141,25 +131,6 @@ def _arrow_table_to_pandas( ) -> pd.DataFrame: _kwargs = kwargs.get("arrow_to_pandas", {}) _kwargs.update({"use_threads": False, "ignore_metadata": False}) - use_nullable_dtypes = _extract_nullable_dtypes(**kwargs) - - if use_nullable_dtypes: - from dask.dataframe.io.parquet.arrow import PYARROW_NULLABLE_DTYPE_MAPPING - - if "types_mapper" in _kwargs: - # User-provided entries take priority over - # PYARROW_NULLABLE_DTYPE_MAPPING - types_mapper = _kwargs["types_mapper"] - - def _types_mapper(pa_type): - return types_mapper(pa_type) or PYARROW_NULLABLE_DTYPE_MAPPING.get( - pa_type - ) - - _kwargs["types_mapper"] = _types_mapper - - else: - _kwargs["types_mapper"] = PYARROW_NULLABLE_DTYPE_MAPPING.get return arrow_table.to_pandas(categories=categories, **_kwargs) @@ -199,25 +170,6 @@ def _arrow_table_to_pandas( _kwargs = kwargs.get("arrow_to_pandas", {}) _kwargs.update({"use_threads": False, "ignore_metadata": False}) - use_nullable_dtypes = _extract_nullable_dtypes(**kwargs) - - if use_nullable_dtypes: - from dask.dataframe.io.parquet.arrow import PYARROW_NULLABLE_DTYPE_MAPPING - - if "types_mapper" in _kwargs: - # User-provided entries take priority over - # PYARROW_NULLABLE_DTYPE_MAPPING - types_mapper = _kwargs["types_mapper"] - - def _types_mapper(pa_type): - return types_mapper(pa_type) or PYARROW_NULLABLE_DTYPE_MAPPING.get( - pa_type - ) - - _kwargs["types_mapper"] = _types_mapper - - else: - _kwargs["types_mapper"] = PYARROW_NULLABLE_DTYPE_MAPPING.get # TODO support additional keywords try: