Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pyspark & ibis capture to SDK #876

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ui/sdk/src/hamilton_sdk/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,20 @@ def _hash_module(
module,
)

def safe_getmembers(module):
"""Need this because some modules are lazily loaded and we can't get the members."""
try:
return inspect.getmembers(module)
except Exception as e:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
f"Skipping hash for module {module.__name__} because we could not get the members. "
f"Error: {e}"
)
return []

# Loop through the module's attributes
for name, value in inspect.getmembers(module):
for name, value in safe_getmembers(module):
# Check if the attribute is a module
if inspect.ismodule(value):
if value.__package__ is None:
Expand Down
89 changes: 89 additions & 0 deletions ui/sdk/src/hamilton_sdk/tracking/ibis_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Any, Dict

from hamilton_sdk.tracking import stats
from ibis.expr.datatypes import core

# import ibis.expr.types as ir
from ibis.expr.types import relations

"""Module that houses functions to introspect an Ibis Table. We don't have expression support yet.
"""

base_data_type_mapping_dict = {
"timestamp": "datetime",
"date": "datetime",
"string": "str",
"integer": "numeric",
"double": "numeric",
"float": "numeric",
"boolean": "boolean",
"long": "numeric",
"short": "numeric",
}


def base_data_type_mapping(data_type: core.DataType) -> str:
"""Returns the base data type of the column.
This uses the internal is_* type methods to determine the base data type.
"""
return "unhandled" # TODO: implement this


base_schema = {
# we can't get all of these about a pyspark dataframe
"base_data_type": None,
# 'count': 0,
"data_type": None,
# 'histogram': {},
# 'max': 0,
# 'mean': 0,
# 'min': 0,
# 'missing': 0,
"name": None,
"pos": None,
# 'quantiles': {},
# 'std': 0,
# 'zeros': 0
}


def _introspect(table: relations.Table) -> Dict[str, Any]:
"""Introspect a PySpark dataframe and return a dictionary of statistics.

:param df: PySpark dataframe to introspect.
:return: Dictionary of column to metadata about it.
"""
# table.
fields = table.schema().items()
column_to_metadata = []
for idx, (field_name, field_type) in enumerate(fields):
values = base_schema.copy()
values.update(
{
"name": field_name,
"pos": idx,
"data_type": str(field_type),
"base_data_type": base_data_type_mapping(field_type),
"nullable": field_type.nullable,
}
)
column_to_metadata.append(values)
return {
"columns": column_to_metadata,
}


@stats.compute_stats.register
def compute_stats_ibis_table(
result: relations.Table, node_name: str, node_tags: dict
) -> Dict[str, Any]:
# TODO: create custom type instead of dict for UI
o_value = _introspect(result)
return {
"observability_type": "dict",
"observability_value": {
"type": str(type(result)),
"value": o_value,
},
"observability_schema_version": "0.0.2",
}
110 changes: 110 additions & 0 deletions ui/sdk/src/hamilton_sdk/tracking/pyspark_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from typing import Any, Dict

import pyspark.sql as ps
from hamilton_sdk.tracking import stats

"""Module that houses functions to introspect a PySpark dataframe.
"""
# this is a mapping used in the Backend/UI.
# we should probably move this to a shared location.
base_data_type_mapping = {
"timestamp": "datetime",
"date": "datetime",
"string": "str",
"integer": "numeric",
"double": "numeric",
"float": "numeric",
"boolean": "boolean",
"long": "numeric",
"short": "numeric",
}

base_schema = {
# we can't get all of these about a pyspark dataframe
"base_data_type": None,
# 'count': 0,
"data_type": None,
# 'histogram': {},
# 'max': 0,
# 'mean': 0,
# 'min': 0,
# 'missing': 0,
"name": None,
"pos": None,
# 'quantiles': {},
# 'std': 0,
# 'zeros': 0
}


def _introspect(df: ps.DataFrame) -> Dict[str, Any]:
"""Introspect a PySpark dataframe and return a dictionary of statistics.

:param df: PySpark dataframe to introspect.
:return: Dictionary of column to metadata about it.
"""
fields = df.schema.jsonValue()["fields"]
column_to_metadata = []
for idx, field in enumerate(fields):
values = base_schema.copy()
values.update(
{
"name": field["name"],
"pos": idx,
"data_type": field["type"],
"base_data_type": base_data_type_mapping.get(field["type"], "unhandled"),
"nullable": field["nullable"],
}
)
column_to_metadata.append(values)
cost_explain = df._sc._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), "cost")
extended_explain = df._sc._jvm.PythonSQLUtils.explainString(
df._jdf.queryExecution(), "extended"
)
return {
"columns": column_to_metadata,
"cost_explain": cost_explain,
"extended_explain": extended_explain,
}


@stats.compute_stats.register
def compute_stats_psdf(result: ps.DataFrame, node_name: str, node_tags: dict) -> Dict[str, Any]:
# TODO: create custom type instead of dict for UI
o_value = _introspect(result)
return {
"observability_type": "dict",
"observability_value": {
"type": str(type(result)),
"value": o_value,
},
"observability_schema_version": "0.0.2",
}


if __name__ == "__main__":
import numpy as np
import pandas as pd

df = pd.DataFrame(
{
"a": [1, 2, 3, 4, 5],
"b": ["a", "b", "c", "d", "e"],
"c": [True, False, True, False, True],
"d": [1.0, 2.0, 3.0, 4.0, 5.0],
"e": pd.Categorical(["a", "b", "c", "d", "e"]),
"f": pd.Series(["a", "b", "c", "d", "e"], dtype="string"),
"g": pd.Series(["a", "b", "c", "d", "e"], dtype="object"),
"h": pd.Series(
["20221231", None, "20221231", "20221231", "20221231"], dtype="datetime64[ns]"
),
"i": pd.Series([None, None, None, None, None], name="a", dtype=np.float64),
"j": pd.Series(name="a", data=pd.date_range("20230101", "20230105")),
}
)
spark = ps.SparkSession.builder.master("local[1]").getOrCreate()
psdf = spark.createDataFrame(df)
import pprint

res = compute_stats_psdf(psdf, "df", {})
pprint.pprint(res)
27 changes: 15 additions & 12 deletions ui/sdk/src/hamilton_sdk/tracking/runs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
import importlib
import logging
import sys
import time as py_time
Expand All @@ -14,21 +15,23 @@
from hamilton.data_quality import base as dq_base
from hamilton.lifecycle import base as lifecycle_base

try:
from hamilton_sdk.tracking import numpy_stats # noqa: F401
from hamilton_sdk.tracking import pandas_stats # noqa: F401

except ImportError:
pass

try:
from hamilton_sdk.tracking import polars_stats # noqa: F401

except ImportError:
pass
_modules_to_import = [
"numpy",
"pandas",
"polars",
"pyspark",
"ibis",
]

logger = logging.getLogger(__name__)

for module in _modules_to_import:
try:
importlib.import_module(f"hamilton_sdk.tracking.{module}_stats")
except ImportError:
logger.debug(f"Failed to import hamilton_sdk.tracking.{module}_stats")
pass


def process_result(result: Any, node: h_node.Node) -> Any:
"""Processes result -- this is purely a by-type mapping.
Expand Down