Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add to_string method to SparkLikeExprDateTimeNamespace #1842

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
45 changes: 45 additions & 0 deletions narwhals/_spark_like/expr_dt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from pyspark.sql import functions as F # noqa: N812

from narwhals._spark_like.utils import strptime_to_pyspark_format

if TYPE_CHECKING:
from pyspark.sql import Column
from typing_extensions import Self
Expand All @@ -15,6 +17,49 @@ class SparkLikeExprDateTimeNamespace:
def __init__(self: Self, expr: SparkLikeExpr) -> None:
self._compliant_expr = expr

def to_string(self: Self, format: str) -> SparkLikeExpr: # noqa: A002
def _format_iso_week_with_day(_input: Column) -> Column:
"""Format datetime as ISO week string with day."""
year = F.date_format(_input, "yyyy")
week = F.lpad(F.weekofyear(_input).cast("string"), 2, "0")
day = F.dayofweek(_input)
# Adjust Sunday from 1 to 7
day = F.when(day == 1, 7).otherwise(day - 1)
return F.concat(year, F.lit("-W"), week, F.lit("-"), day.cast("string"))

def _format_iso_week(_input: Column) -> Column:
"""Format datetime as ISO week string."""
year = F.date_format(_input, "yyyy")
week = F.lpad(F.weekofyear(_input).cast("string"), 2, "0")
return F.concat(year, F.lit("-W"), week)

def _format_iso_datetime(_input: Column) -> Column:
"""Format datetime as ISO datetime with microseconds."""
date_part = F.date_format(_input, "yyyy-MM-dd")
time_part = F.date_format(_input, "HH:mm:ss")
micros = F.unix_micros(_input) % 1_000_000
micros_str = F.lpad(micros.cast("string"), 6, "0")
return F.concat(date_part, F.lit("T"), time_part, F.lit("."), micros_str)

def _to_string(_input: Column) -> Column:
# Handle special formats
if format == "%G-W%V":
return _format_iso_week(_input)
if format == "%G-W%V-%u":
return _format_iso_week_with_day(_input)
if format in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S%.f"):
return _format_iso_datetime(_input)

# Convert Python format to PySpark format
pyspark_fmt = strptime_to_pyspark_format(format)
return F.date_format(_input, pyspark_fmt)

return self._compliant_expr._from_call(
_to_string,
"to_string",
returns_scalar=self._compliant_expr._returns_scalar,
)

def date(self: Self) -> SparkLikeExpr:
return self._compliant_expr._from_call(
F.to_date,
Expand Down
44 changes: 2 additions & 42 deletions narwhals/_spark_like/expr_str.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import overload

from pyspark.sql import functions as F # noqa: N812

from narwhals._spark_like.utils import strptime_to_pyspark_format

if TYPE_CHECKING:
from pyspark.sql import Column
from typing_extensions import Self
Expand Down Expand Up @@ -123,44 +124,3 @@ def to_datetime(self: Self, format: str | None) -> SparkLikeExpr: # noqa: A002
"to_datetime",
returns_scalar=self._compliant_expr._returns_scalar,
)


@overload
def strptime_to_pyspark_format(format: None) -> None: ...


@overload
def strptime_to_pyspark_format(format: str) -> str: ...


def strptime_to_pyspark_format(format: str | None) -> str | None: # noqa: A002
"""Converts a Python strptime datetime format string to a PySpark datetime format string."""
# Mapping from Python strptime format to PySpark format
if format is None:
return None

# see https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
# and https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior
format_mapping = {
"%Y": "y", # Year with century
"%y": "y", # Year without century
"%m": "M", # Month
"%d": "d", # Day of the month
"%H": "H", # Hour (24-hour clock) 0-23
"%I": "h", # Hour (12-hour clock) 1-12
"%M": "m", # Minute
"%S": "s", # Second
"%f": "S", # Microseconds -> Milliseconds
"%p": "a", # AM/PM
"%a": "E", # Abbreviated weekday name
"%A": "E", # Full weekday name
"%j": "D", # Day of the year
"%z": "Z", # Timezone offset
"%s": "X", # Unix timestamp
}

# Replace Python format specifiers with PySpark specifiers
pyspark_format = format
for py_format, spark_format in format_mapping.items():
pyspark_format = pyspark_format.replace(py_format, spark_format)
return pyspark_format.replace("T", " ")
41 changes: 41 additions & 0 deletions narwhals/_spark_like/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from functools import lru_cache
from typing import TYPE_CHECKING
from typing import Any
from typing import overload

from pyspark.sql import functions as F # noqa: N812

Expand Down Expand Up @@ -195,3 +196,43 @@ def _var(_input: Column | str, ddof: int, np_version: tuple[int, ...]) -> Column

input_col = F.col(_input) if isinstance(_input, str) else _input
return var(input_col, ddof=ddof)


@overload
def strptime_to_pyspark_format(format: None) -> None: ...


@overload
def strptime_to_pyspark_format(format: str) -> str: ...


def strptime_to_pyspark_format(format: str | None) -> str | None: # noqa: A002
"""Converts a Python strptime datetime format string to a PySpark datetime format string."""
if format is None:
return None

# see https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
# and https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior
format_mapping = {
"%Y": "yyyy", # Year with century (4 digits)
"%y": "yy", # Year without century (2 digits)
"%m": "MM", # Month (01-12)
"%d": "dd", # Day of the month (01-31)
"%H": "HH", # Hour (24-hour clock) (00-23)
"%I": "hh", # Hour (12-hour clock) (01-12)
"%M": "mm", # Minute (00-59)
"%S": "ss", # Second (00-59)
"%f": "S", # Microseconds -> Milliseconds
"%p": "a", # AM/PM
"%a": "E", # Abbreviated weekday name
"%A": "E", # Full weekday name
"%j": "D", # Day of the year
"%z": "Z", # Timezone offset
"%s": "X", # Unix timestamp
}

# Replace Python format specifiers with PySpark specifiers
pyspark_format = format
for py_format, spark_format in format_mapping.items():
pyspark_format = pyspark_format.replace(py_format, spark_format)
return pyspark_format.replace("T", " ")
5 changes: 3 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,16 @@ def pyspark_lazy_constructor() -> Callable[[Any], IntoFrame]: # pragma: no cove
"ignore", r"Using fork\(\) can cause Polars", category=RuntimeWarning
)

# common timezone for all tests environments
os.environ["TZ"] = "UTC"

session = (
SparkSession.builder.appName("unit-tests")
.master("local[1]")
.config("spark.ui.enabled", "false")
# executing one task at a time makes the tests faster
.config("spark.default.parallelism", "1")
.config("spark.sql.shuffle.partitions", "2")
# common timezone for all tests environments
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate()
)

Expand Down
11 changes: 2 additions & 9 deletions tests/expr_and_series/dt/to_string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ def test_dt_to_string_series(constructor_eager: ConstructorEager, fmt: str) -> N
],
)
@pytest.mark.skipif(is_windows(), reason="pyarrow breaking on windows")
def test_dt_to_string_expr(
constructor: Constructor, fmt: str, request: pytest.FixtureRequest
) -> None:
if "pyspark" in str(constructor):
request.applymarker(pytest.mark.xfail)
def test_dt_to_string_expr(constructor: Constructor, fmt: str) -> None:
input_frame = nw.from_native(constructor(data))

expected_col = [datetime.strftime(d, fmt) for d in data["a"]]
Expand Down Expand Up @@ -141,7 +137,7 @@ def test_dt_to_string_iso_local_datetime_expr(
expected: str,
request: pytest.FixtureRequest,
) -> None:
if ("pyspark" in str(constructor)) or "duckdb" in str(constructor):
if "duckdb" in str(constructor):
request.applymarker(pytest.mark.xfail)
df = constructor({"a": [data]})

Expand Down Expand Up @@ -178,10 +174,7 @@ def test_dt_to_string_iso_local_date_expr(
constructor: Constructor,
data: datetime,
expected: str,
request: pytest.FixtureRequest,
) -> None:
if "pyspark" in str(constructor):
request.applymarker(pytest.mark.xfail)
df = constructor({"a": [data]})
result = nw.from_native(df).with_columns(
nw.col("a").dt.to_string("%Y-%m-%d").alias("b")
Expand Down
Loading