Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add minimal PySpark support #908

Merged
merged 98 commits into from
Dec 5, 2024
Merged

Conversation

EdAbati
Copy link
Collaborator

@EdAbati EdAbati commented Sep 3, 2024

What type of PR is this? (check all applicable)

  • πŸ’Ύ Refactor
  • ✨ Feature
  • πŸ› Bug Fix
  • πŸ”§ Optimization
  • πŸ“ Documentation
  • βœ… Test
  • 🐳 Other

Related issues

Checklist

  • Code follows style guide (ruff)
  • Tests added
  • Documented the changes

If you have comments or can explain your changes, please do so below.

As mentioned in the latest call, I've started working on the support for PySpark.

The goal of this PR would be to have a minimal initial implementation as a starting point. As we did for Dask, we can implement single methods in following PRs!

⚠️ this is not ready for review, a lot of test are failing, the code is ugly. :) Just making the PR for visibility and to have a place to comment/ask questions on specific points

@github-actions github-actions bot added the enhancement New feature or request label Sep 3, 2024
@EdAbati EdAbati changed the title feat: Add Pyspark support feat: Add minimal Pyspark support Sep 3, 2024
@EdAbati
Copy link
Collaborator Author

EdAbati commented Sep 12, 2024

This PR diff is getting big because of all the xfail in tests. πŸ˜•
@MarcoGorelli @FBruzzesi do you have a better idea on how to make it more "reviewable"? or do you think it is fine?

@FBruzzesi
Copy link
Member

This PR diff is getting big because of all the xfail in tests. πŸ˜• @MarcoGorelli @FBruzzesi do you have a better idea on how to make it more "reviewable"? or do you think it is fine?

For dask we started with it's own test file, so that we didn't have to modify every other file.
Once we had a few methods implemented we shifted the constructor into the conftest list of constructors and added the xfails.

Would that be a good strategy again?

@EdAbati EdAbati changed the title feat: Add minimal Pyspark support feat: Add minimal PySpark support Nov 21, 2024
Copy link
Member

@MarcoGorelli MarcoGorelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seriously awesome and exciting work here, I'm impressed, well done!

Just left some comments / questions

Also, just noting something which I noticed:

(Pdb) p df.with_columns(d=nw.col('a').mean()).collect().to_native()
*** pyspark.errors.exceptions.captured.AnalysisException: [MISSING_GROUP_BY] The query does not include a GROUP BY clause. Add GROUP BY or turn it into the window functions using OVER clauses.;
Aggregate [a#1L, b#2L, c#3L, avg(a#1L) AS d#62]
+- Project [a#1L, b#2L, c#3L]
   +- Sort [index#0L ASC NULLS FIRST], true
      +- Repartition 2, true
         +- LogicalRDD [index#0L, a#1L, b#2L, c#3L], false

but we can deal with it later - I presume it shouldn't be too bad?

Can we also add a check in

def test_pandas(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delitem(sys.modules, "polars")
monkeypatch.delitem(sys.modules, "pyarrow")
monkeypatch.delitem(sys.modules, "dask", raising=False)
monkeypatch.delitem(sys.modules, "ibis", raising=False)
df = pd.DataFrame({"a": [1, 1, 2], "b": [4, 5, 6]})
nw.from_native(df, eager_only=True).group_by("a").agg(nw.col("b").mean()).filter(
nw.col("a") > 1
)
assert "polars" not in sys.modules
assert "pandas" in sys.modules
assert "numpy" in sys.modules
assert "pyarrow" not in sys.modules
assert "dask" not in sys.modules
assert "ibis" not in sys.modules

that in test_pandas or test_polars that 'pyspark' isn't in sys.modules at the end of the test?

narwhals/_spark_like/utils.py Outdated Show resolved Hide resolved
narwhals/_spark_like/utils.py Outdated Show resolved Hide resolved
narwhals/_spark_like/utils.py Outdated Show resolved Hide resolved


def get_column_name(df: SparkLazyFrame, column: Column) -> str:
return str(df._native_frame.select(column).columns[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of interest, is the str here just for typing purposes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes :)

Comment on lines +47 to +52
datetime_types = [
pyspark_types.TimestampType,
pyspark_types.TimestampNTZType,
]
if any(isinstance(dtype, t) for t in datetime_types):
return dtypes.Datetime()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any time_unit / time_zone we should pass to dtypes.Datetime?

Copy link
Collaborator Author

@EdAbati EdAbati Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point!

I think we cannot get it from the type itself but I need some more time to do some research.

If found things pyspark does when converting to pandas that could be useful to us:

Still have to spend some time to try to understand, but with a brief look it seems the time_unit is set to ns and time_zone is set to the local timezone.

For arrow it seems to be different though https://github.com/apache/spark/blob/master/python/pyspark/sql/pandas/types.py#L66

I don't have an opinion at the moment. What do you think?

Copy link
Member

@MarcoGorelli MarcoGorelli Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, happy to defer this, not a blocker

narwhals/_spark_like/utils.py Show resolved Hide resolved
narwhals/translate.py Outdated Show resolved Hide resolved
@MarcoGorelli
Copy link
Member

amazing, thanks for updating! happy to merge once it's green as πŸ₯¦

@EdAbati
Copy link
Collaborator Author

EdAbati commented Dec 4, 2024

Thank you all for the feedback and apologies this is taking longer but I am a bit busy.

The CI is almost green πŸ˜… I need to add some if-else because pyspark is not available in python>3.11. Hopefully it will be fixed tonight

regarding the issue with df.with_columns(d=nw.col('a').mean()).collect().to_native()
I may have an idea on how to do it. I try to squeeze this is in too (and add a test). If this takes longer, could it be a follow-up?

@MarcoGorelli
Copy link
Member

If this takes longer, could it be a follow-up?

yup definitely!

@EdAbati
Copy link
Collaborator Author

EdAbati commented Dec 5, 2024

Ok CI is 🌲🌳

Regarding that issue, I am still working on a fix πŸ₯²

Copy link
Member

@MarcoGorelli MarcoGorelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @EdAbati for this massive effort

cool, let's ship it before merge conflicts creep it, excited about building this up!

@MarcoGorelli MarcoGorelli merged commit ea278ae into narwhals-dev:main Dec 5, 2024
24 checks passed
@EdAbati EdAbati deleted the pyspark branch December 5, 2024 08:08
@EdAbati
Copy link
Collaborator Author

EdAbati commented Dec 5, 2024

Exciting πŸŽ‰πŸŽ‰πŸŽ‰ thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Enh]: Add Support For PySpark
3 participants