Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dataset to stac converter #173

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions odc/stac/eo3/_eo3converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"view:azimuth": "eo:azimuth",
"view:sun_azimuth": "eo:sun_azimuth",
"view:sun_elevation": "eo:sun_elevation",
"created": "odc:processing_datetime",
}

(_eo3,) = (
Expand Down
283 changes: 283 additions & 0 deletions odc/stac/eo3/_stacconverter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
import datetime
import math
from pathlib import Path
from typing import Iterable, Iterator
import mimetypes

import pystac.asset
import pystac.collection
import pystac.errors
import pystac.item
from pystac.extensions.eo import Band, EOExtension
from pystac.extensions.projection import ProjectionExtension
from pystac.extensions.view import ViewExtension
from pystac import Asset, Link, MediaType
from pystac.utils import datetime_to_str
from pystac.errors import STACError

from datacube.model import Dataset
from datacube.utils.uris import uri_resolve
from datacube.index.eo3 import EO3Grid

from ._eo3converter import STAC_TO_EO3_RENAMES

MAPPING_EO3_TO_STAC = {v: k for k, v in STAC_TO_EO3_RENAMES.items()}

def _as_stac_instruments(value: str):
"""
>>> _as_stac_instruments('TM')
['tm']
>>> _as_stac_instruments('OLI')
['oli']
>>> _as_stac_instruments('ETM+')
['etm']
>>> _as_stac_instruments('OLI_TIRS')
['oli', 'tirs']
"""
return [i.strip("+-").lower() for i in value.split("_")]


# may need to be more robust
def _convert_value_to_stac_type(key: str, value):
"""
Convert return type as per STAC specification
"""
# In STAC spec, "instruments" have [String] type
if key == "eo:instrument":
return _as_stac_instruments(value)
# Convert the non-default datetimes to a string
elif isinstance(value, datetime.datetime) and key != "datetime":
return datetime_to_str(value)
else:
return value


def _media_type(path: Path) -> str:
"""
Add media type of the asset object
"""
mime_type = mimetypes.guess_type(path.name)[0]
if path.suffix == ".sha1":
return MediaType.TEXT
elif path.suffix == ".yaml":
return "text/yaml"
elif mime_type:
if mime_type == "image/tiff":
return MediaType.COG
else:
return mime_type
else:
return "application/octet-stream"


def _asset_roles_fields(asset_name: str) -> list[str]:
"""
Add roles of the asset object
"""
if asset_name.startswith("thumbnail"):
return ["thumbnail"]
else:
return ["metadata"]


def _asset_title_fields(asset_name: str) -> str | None:
"""
Add title of the asset object
"""
if asset_name.startswith("thumbnail"):
return "Thumbnail image"
else:
return None


def _proj_fields(grids: dict[str, EO3Grid], grid_name: str = "default") -> dict:
"""
Get any proj (Stac projection extension) fields if we have them for the grid.
"""
if not grids:
return {}

grid_doc = grids.get(grid_name or "default")
if not grid_doc:
return {}

return {
"shape": grid_doc.shape,
"transform": grid_doc.transform,
}


def _lineage_fields(dataset: Dataset) -> dict:
"""
Add custom lineage field to a STAC Item
"""
dataset_doc = dataset.metadata_doc
# using sources vs source_tree?
if dataset.sources:
lineage = {classifier: [str(d.id)] for classifier, d in dataset.sources}
elif dataset_doc.get("lineage"):
# sometimes lineage is included at 'lineage' instead of 'lineage.source_datasets'
# in which case it should already be in {classifier: [ids]} format
lineage = dataset_doc.get("lineage")
# shouldn't need to account for legacy embedded lineage at this point
else:
return {}
# it seems like derived are not accounted for at all - on purpose?

return {"odc:lineage": lineage}


def eo3_to_stac_properties(dataset: Dataset) -> dict:
"""
Convert EO3 properties dictionary to the Stac equivalent.
"""
title = dataset.metadata.label
# explorer has logic to try and figure out a label if missing, should it be included here?
properties = {
# Put the title at the top for document readability.
**(dict(title=title) if title else {}),
**{
MAPPING_EO3_TO_STAC.get(key, key): _convert_value_to_stac_type(key, val)
for key, val in dataset.metadata_doc.properties.items()
},
}

return properties


def ds_to_item(
dataset: Dataset,
stac_item_url: str | None = None,
) -> pystac.Item:
"""
Convert the given ODC Dataset into a Stac Item document.

Note: You may want to call `validate_item(doc)` on the outputs to find any
incomplete properties.

:param collection_url: URL to the Stac Collection. Either this or an explorer_base_url
should be specified for Stac compliance.
:param stac_item_url: Public 'self' URL where the stac document will be findable.
"""
if not dataset.is_eo3:
raise STACError("Cannot convert non-eo3 datasets to STAC")

if stac_item_url is None and not dataset.uri:
raise ValueError("No dataset location provided")

if dataset.extent is not None:
wgs84_geometry = dataset.extent.to_crs("EPSG:4326", math.inf)
geometry = wgs84_geometry.json
bbox = wgs84_geometry.boundingbox.bbox
else:
geometry = None
bbox = None

properties = eo3_to_stac_properties(dataset)
properties.update(_lineage_fields(dataset))

dt = dataset.time
if dt is None:
raise ValueError("Cannot convert dataset with no datetime information")
dt_info = {}
if dt.begin == dt.end:
dt_info["start_datetime"] = dt.begin
dt_info["end_datetime"] = dt.end
else:
dt_info["datetime"] = dt.begin
properties.pop("datetime", None)

item = pystac.item.Item(
id=str(dataset.id),
**dt_info,
properties=properties,
geometry=geometry,
bbox=bbox,
collection=dataset.product.name,
)

EOExtension.ext(item, add_if_missing=True)

# while dataset._gs has already handled grid information, it doesn't allow us to
# access all the information we need here
grids = {name: EO3Grid(grid_spec) for name, grid_spec in dataset.metadata_doc.get("grids", {}).items()}

if geometry:
proj = ProjectionExtension.ext(item, add_if_missing=True)

if dataset.crs is None:
raise STACError("Projection extension requires either epsg or wkt for crs.")
if dataset.crs.epsg is not None:
proj.apply(epsg=dataset.crs.epsg, **_proj_fields(grids))
else:
proj.apply(wkt2=dataset.crs, **_proj_fields(grids))

# To pass validation, only add 'view' extension when we're using it somewhere.
if any(k.startswith("view:") for k in properties.keys()):
ViewExtension.ext(item, add_if_missing=True)

# Without a dataset location, all paths will be relative.
dataset_location = dataset.uri # or should we default to stac_url?

# Add assets that are data
for name, measurement in dataset.measurements.items():
if not dataset_location and not measurement.get("path"):
# No URL to link to. URL is mandatory for Stac validation.
continue

asset = Asset(
href=uri_resolve(dataset_location, measurement.get("path")),
media_type=_media_type(Path(measurement.get("path"))),
title=name,
roles=["data"],
)
eo = EOExtension.ext(asset)

# TODO: pull out more information about the band
band = Band.create(name)
eo.apply(bands=[band])

if grids:
proj_fields = _proj_fields(grids, measurement.get("grid"))
if proj_fields is not None:
proj = ProjectionExtension.ext(asset)
# Not sure how this handles None for an EPSG code
# should we have a wkt2 case like above?
proj.apply(
**proj_fields,
epsg=dataset.crs.epsg,
)

item.add_asset(name, asset=asset)

# Add assets that are accessories
for name, acc in dataset.accessories.items():
if not dataset_location and not acc.get("path"):
# No URL to link to. URL is mandatory for Stac validation.
continue

asset = Asset(
href=uri_resolve(dataset_location, acc.get("path")),
media_type=_media_type(Path(acc.get("path"))),
title=_asset_title_fields(name),
roles=_asset_roles_fields(name),
)

item.add_asset(name, asset=asset)

# should all item links be handled externally?
if stac_item_url:
item.links.append(
Link(
rel="self",
media_type=MediaType.JSON,
target=stac_item_url,
)
)

return item


def ds2stac(datasets: Iterable[Dataset]) -> Iterator[pystac.item.Item]:
for dataset in datasets:
yield ds_to_item(dataset, dataset.uri)
Loading